qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acon...@apache.org
Subject svn commit: r1701109 - in /qpid/trunk/qpid/cpp: ./ src/ src/qpid/broker/ src/qpid/broker/amqp_0_10/ src/qpid/broker/windows/ src/qpid/client/ src/qpid/ha/ src/qpid/sys/ src/tests/
Date Thu, 03 Sep 2015 18:59:06 GMT
Author: aconway
Date: Thu Sep  3 18:59:05 2015
New Revision: 1701109

URL: http://svn.apache.org/r1701109
Log:
QPID-5855 - Simplified HA transaction logic.

Removed complex and incorrect HA+TX logic, reverted to the following limitation:

You can use transactions in a HA cluster, but there are limitations on the
transactional guarantees. Transactions function normally with the *primary*
broker but replication to the backups is not coverted by the atomic guarantee.

The following situations are all safe:
- Client rolls back a transaction.
- Client successfully commits a transaction.
- Primary fails during a transaction *before* the client sends a commit.
- Transaction contains only one message.

The problem case is when all of the following occur:
- transaction contains multiple actions (enqueues or dequeues)
- primary fails between client sending commit and receiving commit-complete.

In this case it is possible that only part of the transaction was replicated to
the backups, so on fail-over partial transaction results may be visible.

Removed:
    qpid/trunk/qpid/cpp/src/qpid/ha/PrimaryTxObserver.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/PrimaryTxObserver.h
    qpid/trunk/qpid/cpp/src/qpid/ha/TxReplicatingSubscription.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/TxReplicatingSubscription.h
    qpid/trunk/qpid/cpp/src/qpid/ha/TxReplicator.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/TxReplicator.h
Modified:
    qpid/trunk/qpid/cpp/CMakeLists.txt
    qpid/trunk/qpid/cpp/src/CMakeLists.txt
    qpid/trunk/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/amqp_0_10/Connection.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/windows/SslProtocolFactory.cpp
    qpid/trunk/qpid/cpp/src/qpid/client/ConnectionSettings.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/Event.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/Event.h
    qpid/trunk/qpid/cpp/src/qpid/ha/Primary.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/Primary.h
    qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/RemoteBackup.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
    qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
    qpid/trunk/qpid/cpp/src/qpid/ha/types.cpp
    qpid/trunk/qpid/cpp/src/qpid/sys/SocketTransport.cpp
    qpid/trunk/qpid/cpp/src/tests/ha_test.py
    qpid/trunk/qpid/cpp/src/tests/ha_tests.py

Modified: qpid/trunk/qpid/cpp/CMakeLists.txt
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/CMakeLists.txt?rev=1701109&r1=1701108&r2=1701109&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/CMakeLists.txt (original)
+++ qpid/trunk/qpid/cpp/CMakeLists.txt Thu Sep  3 18:59:05 2015
@@ -60,7 +60,7 @@ enable_testing()
 include (CTest)
 
 if (MSVC)
-  # Change warning C4996 from level 1 to level 4. These are real and shouldn't
+  # Chaxnge warning C4996 from level 1 to level 4. These are real and shouldn't
   # be completely ignored, but they're pretty well checked out and will throw
   # a run-time error if violated.
   # "warning C4996: 'std::equal': Function call with parameters that may

Modified: qpid/trunk/qpid/cpp/src/CMakeLists.txt
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/CMakeLists.txt?rev=1701109&r1=1701108&r2=1701109&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/CMakeLists.txt (original)
+++ qpid/trunk/qpid/cpp/src/CMakeLists.txt Thu Sep  3 18:59:05 2015
@@ -552,8 +552,6 @@ if (BUILD_HA)
         qpid/ha/Primary.cpp
         qpid/ha/Primary.h
         qpid/ha/PrimaryQueueLimits.h
-        qpid/ha/PrimaryTxObserver.cpp
-        qpid/ha/PrimaryTxObserver.h
         qpid/ha/QueueGuard.cpp
         qpid/ha/QueueGuard.h
         qpid/ha/QueueReplicator.cpp
@@ -571,10 +569,6 @@ if (BUILD_HA)
         qpid/ha/StandAlone.h
         qpid/ha/StatusCheck.cpp
         qpid/ha/StatusCheck.h
-        qpid/ha/TxReplicatingSubscription.cpp
-        qpid/ha/TxReplicatingSubscription.h
-        qpid/ha/TxReplicator.cpp
-        qpid/ha/TxReplicator.h
         qpid/ha/types.cpp
         qpid/ha/types.h
     )

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp?rev=1701109&r1=1701108&r2=1701109&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/QueueFlowLimit.cpp Thu Sep  3 18:59:05 2015
@@ -74,7 +74,7 @@ QueueFlowLimit::QueueFlowLimit(const std
       flowStopSize(_flowStopSize), flowResumeSize(_flowResumeSize),
       flowStopped(false), count(0), size(0), broker(0)
 {
-    QPID_LOG(info, "Queue \"" << queueName << "\": Flow limit created: flowStopCount="
<< flowStopCount
+    QPID_LOG(debug, "Queue \"" << queueName << "\": Flow limit created: flowStopCount="
<< flowStopCount
              << ", flowResumeCount=" << flowResumeCount
              << ", flowStopSize=" << flowStopSize << ", flowResumeSize="
<< flowResumeSize );
 }

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/amqp_0_10/Connection.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/amqp_0_10/Connection.cpp?rev=1701109&r1=1701108&r2=1701109&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/amqp_0_10/Connection.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/amqp_0_10/Connection.cpp Thu Sep  3 18:59:05 2015
@@ -411,7 +411,7 @@ bool Connection::doOutput() {
 }
 
 void Connection::sendHeartbeat() {
-	adapter.heartbeat();
+    requestIOProcessing(boost::bind(&ConnectionHandler::heartbeat, &adapter));
 }
 
 void Connection::closeChannel(uint16_t id) {

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/windows/SslProtocolFactory.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/windows/SslProtocolFactory.cpp?rev=1701109&r1=1701108&r2=1701109&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/windows/SslProtocolFactory.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/windows/SslProtocolFactory.cpp Thu Sep  3 18:59:05
2015
@@ -263,8 +263,7 @@ void SslProtocolFactory::establishedComm
                                            const qpid::sys::Socket& s) {
     if (tcpNoDelay) {
         s.setTcpNoDelay();
-        QPID_LOG(info,
-                 "Set TCP_NODELAY on connection to " << s.getPeerAddress());
+        QPID_LOG(debug, "Set TCP_NODELAY on connection to " << s.getPeerAddress());
     }
 
     async->init(aio, brokerTimer, maxNegotiateTime);

Modified: qpid/trunk/qpid/cpp/src/qpid/client/ConnectionSettings.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionSettings.cpp?rev=1701109&r1=1701108&r2=1701109&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/ConnectionSettings.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/ConnectionSettings.cpp Thu Sep  3 18:59:05 2015
@@ -51,7 +51,7 @@ void ConnectionSettings::configureSocket
 {
     if (tcpNoDelay) {
         socket.setTcpNoDelay();
-        QPID_LOG(info, "Set TCP_NODELAY");
+        QPID_LOG(debug, "Set TCP_NODELAY");
     }
 }
 

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp?rev=1701109&r1=1701108&r2=1701109&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/BrokerReplicator.cpp Thu Sep  3 18:59:05 2015
@@ -21,7 +21,6 @@
 #include "BrokerReplicator.h"
 #include "HaBroker.h"
 #include "QueueReplicator.h"
-#include "TxReplicator.h"
 #include "qpid/broker/Broker.h"
 #include "qpid/broker/amqp_0_10/Connection.h"
 #include "qpid/broker/Queue.h"
@@ -772,10 +771,7 @@ boost::shared_ptr<QueueReplicator> Broke
     const boost::shared_ptr<Queue>& queue)
 {
     if (replicationTest.getLevel(*queue) == ALL) {
-        if (TxReplicator::isTxQueue(queue->getName())) 
-            return TxReplicator::create(haBroker, queue, link);
-        else
-            return QueueReplicator::create(haBroker, queue, link);
+        return QueueReplicator::create(haBroker, queue, link);
     }
     return boost::shared_ptr<QueueReplicator>();
 }
@@ -886,10 +882,6 @@ void BrokerReplicator::disconnectedQueue
     const boost::shared_ptr<QueueReplicator>& qr)
 {
     qr->disconnect();
-    if (TxReplicator::isTxQueue(qr->getQueue()->getName())) {
-        // Transactions are aborted on failover so clean up tx-queues
-        deleteQueue(qr->getQueue()->getName());
-    }
 }
 
 // Called by ConnectionObserver::disconnected, disconnected from the network side.

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/Event.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/Event.cpp?rev=1701109&r1=1701108&r2=1701109&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/Event.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/Event.cpp Thu Sep  3 18:59:05 2015
@@ -44,14 +44,6 @@ bool isEventKey(const std::string& key)
 
 const string DequeueEvent::KEY(QPID_HA+"de");
 const string IdEvent::KEY(QPID_HA+"id");
-const string TxEnqueueEvent::KEY(QPID_HA+"txenq");
-const string TxDequeueEvent::KEY(QPID_HA+"txdeq");
-const string TxPrepareEvent::KEY(QPID_HA+"txpre");
-const string TxCommitEvent::KEY(QPID_HA+"txcom");
-const string TxRollbackEvent::KEY(QPID_HA+"txrb");
-const string TxPrepareOkEvent::KEY(QPID_HA+"txok");
-const string TxPrepareFailEvent::KEY(QPID_HA+"txno");
-const string TxBackupsEvent::KEY(QPID_HA+"txmem");
 
 broker::Message makeMessage(
     const string& data, const string& destination, const string& routingKey)

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/Event.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/Event.h?rev=1701109&r1=1701108&r2=1701109&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/Event.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/Event.h Thu Sep  3 18:59:05 2015
@@ -94,100 +94,6 @@ struct IdEvent : public EventBase<IdEven
     void print(std::ostream& o) const { o << id; }
 };
 
-//// Transaction events
-
-struct TxEnqueueEvent : public EventBase<TxEnqueueEvent> {
-    static const std::string KEY;
-    framing::LongString queue;
-    ReplicationId id;
-
-    TxEnqueueEvent(std::string q=std::string(), ReplicationId i=ReplicationId())
-        : queue(q), id(i) {}
-    void encode(framing::Buffer& b) const { b.put(queue); b.put(id); }
-    void decode(framing::Buffer& b) { b.get(queue); b.get(id); }
-    virtual size_t encodedSize() const { return queue.encodedSize()+id.encodedSize(); }
-    void print(std::ostream& o) const { o << queue.value << " " <<
id; }
-};
-
-struct TxDequeueEvent : public EventBase<TxDequeueEvent> {
-    static const std::string KEY;
-    framing::LongString queue;
-    ReplicationId id;
-
-    TxDequeueEvent(std::string q=std::string(), ReplicationId r=0) :
-        queue(q), id(r) {}
-    void encode(framing::Buffer& b) const { b.put(queue);b.put(id); }
-    void decode(framing::Buffer& b) { b.get(queue);b.get(id); }
-    virtual size_t encodedSize() const { return queue.encodedSize()+id.encodedSize(); }
-    void print(std::ostream& o) const { o << queue.value << " " <<
id; }
-};
-
-struct TxPrepareEvent : public EventBase<TxPrepareEvent> {
-    static const std::string KEY;
-    void encode(framing::Buffer&) const {}
-    void decode(framing::Buffer&) {}
-    virtual size_t encodedSize() const { return 0; }
-    void print(std::ostream&) const {}
-};
-
-struct TxCommitEvent : public EventBase<TxCommitEvent> {
-    static const std::string KEY;
-    void encode(framing::Buffer&) const {}
-    void decode(framing::Buffer&) {}
-    virtual size_t encodedSize() const { return 0; }
-    void print(std::ostream&) const {}
-};
-
-struct TxRollbackEvent : public EventBase<TxRollbackEvent> {
-    static const std::string KEY;
-    void encode(framing::Buffer&) const {}
-    void decode(framing::Buffer&) {}
-    virtual size_t encodedSize() const { return 0; }
-    void print(std::ostream&) const {}
-};
-
-struct TxPrepareOkEvent : public EventBase<TxPrepareOkEvent> {
-    static const std::string KEY;
-    types::Uuid broker;
-    TxPrepareOkEvent(const types::Uuid& b=types::Uuid()) : broker(b) {}
-
-    void encode(framing::Buffer& b) const {
-        b.putRawData(broker.data(), broker.size());
-    }
-
-    void decode(framing::Buffer& b) {
-        std::string s;
-        b.getRawData(s, broker.size());
-        broker = types::Uuid(&s[0]);
-    }
-    virtual size_t encodedSize() const { return broker.size(); }
-    void print(std::ostream& o) const { o << broker; }
-};
-
-struct TxPrepareFailEvent : public EventBase<TxPrepareFailEvent> {
-    static const std::string KEY;
-    types::Uuid broker;
-    TxPrepareFailEvent(const types::Uuid& b=types::Uuid()) : broker(b) {}
-    void encode(framing::Buffer& b) const { b.putRawData(broker.data(), broker.size());
}
-    void decode(framing::Buffer& b) {
-        std::string s;
-        b.getRawData(s, broker.size());
-        broker = types::Uuid(&s[0]);
-    }
-    virtual size_t encodedSize() const { return broker.size(); }
-    void print(std::ostream& o) const { o << broker; }
-};
-
-struct TxBackupsEvent : public EventBase<TxBackupsEvent> {
-    static const std::string KEY;
-    UuidSet backups;
-    TxBackupsEvent(const UuidSet& s=UuidSet()) : backups(s) {}
-    void encode(framing::Buffer& b) const { b.put(backups); }
-    void decode(framing::Buffer& b) { b.get(backups); }
-    size_t encodedSize() const { return backups.encodedSize(); }
-    void print(std::ostream& o) const { o << backups; }
-};
-
 }} // namespace qpid::ha
 
 #endif  /*!QPID_HA_EVENT_H*/

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/Primary.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/Primary.cpp?rev=1701109&r1=1701108&r2=1701109&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/Primary.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/Primary.cpp Thu Sep  3 18:59:05 2015
@@ -27,11 +27,11 @@
 #include "RemoteBackup.h"
 #include "ConnectionObserver.h"
 #include "QueueReplicator.h"
-#include "PrimaryTxObserver.h"
 #include "qpid/assert.h"
 #include "qpid/broker/Broker.h"
 #include "qpid/broker/BrokerObserver.h"
 #include "qpid/broker/Connection.h"
+#include "qpid/broker/amqp_0_10/Connection.h"
 #include "qpid/broker/Queue.h"
 #include "qpid/broker/SessionHandlerObserver.h"
 #include "qpid/framing/FieldTable.h"
@@ -77,8 +77,6 @@ class PrimaryBrokerObserver : public bro
     void queueDestroy(const Primary::QueuePtr& q) { primary.queueDestroy(q); }
     void exchangeCreate(const Primary::ExchangePtr& q) { primary.exchangeCreate(q); }
     void exchangeDestroy(const Primary::ExchangePtr& q) { primary.exchangeDestroy(q);
}
-    void startTx(const intrusive_ptr<broker::TxBuffer>& tx) { primary.startTx(tx);
}
-    void startDtx(const intrusive_ptr<broker::DtxBuffer>& dtx) { primary.startDtx(dtx);
}
 
  private:
     Primary& primary;
@@ -268,38 +266,6 @@ void Primary::addReplica(ReplicatingSubs
     replicas[make_pair(rs.getBrokerInfo().getSystemId(), rs.getQueue())] = &rs;
 }
 
-void Primary::skipEnqueues(
-    const types::Uuid& backup,
-    const boost::shared_ptr<broker::Queue>& queue,
-    const ReplicationIdSet& ids)
-{
-    sys::Mutex::ScopedLock l(lock);
-    ReplicaMap::const_iterator i = replicas.find(make_pair(backup, queue));
-    if (i != replicas.end()) i->second->skipEnqueues(ids);
-}
-
-void Primary::skipDequeues(
-    const types::Uuid& backup,
-    const boost::shared_ptr<broker::Queue>& queue,
-    const ReplicationIdSet& ids)
-{
-    sys::Mutex::ScopedLock l(lock);
-    ReplicaMap::const_iterator i = replicas.find(make_pair(backup, queue));
-    if (i != replicas.end()) i->second->skipDequeues(ids);
-}
-
-// Called from ReplicatingSubscription::cancel
-void Primary::removeReplica(const ReplicatingSubscription& rs) {
-    boost::shared_ptr<PrimaryTxObserver> tx;
-    {
-        sys::Mutex::ScopedLock l(lock);
-        replicas.erase(make_pair(rs.getBrokerInfo().getSystemId(), rs.getQueue()));
-        TxMap::const_iterator i = txMap.find(rs.getQueue()->getName());
-        if (i != txMap.end()) tx = i->second.lock();
-    }
-    if (tx) tx->cancel(rs);     // Outside of lock.
-}
-
 // NOTE: Called with queue registry lock held.
 void Primary::queueCreate(const QueuePtr& q) {
     // Set replication argument.
@@ -477,22 +443,4 @@ void Primary::setCatchupQueues(const Rem
     backup->startCatchup();
 }
 
-shared_ptr<PrimaryTxObserver> Primary::makeTxObserver(
-    const boost::intrusive_ptr<broker::TxBuffer>& txBuffer)
-{
-    shared_ptr<PrimaryTxObserver> observer =
-        PrimaryTxObserver::create(*this, haBroker, txBuffer);
-    sys::Mutex::ScopedLock l(lock);
-    txMap[observer->getTxQueue()->getName()] = observer;
-    return observer;
-}
-
-void Primary::startTx(const boost::intrusive_ptr<broker::TxBuffer>& txBuffer) {
-    txBuffer->setObserver(makeTxObserver(txBuffer));
-}
-
-void Primary::startDtx(const boost::intrusive_ptr<broker::DtxBuffer>& ) {
-    QPID_LOG(warning, "DTX transactions in a HA cluster are not yet atomic");
-}
-
 }} // namespace qpid::ha

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/Primary.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/Primary.h?rev=1701109&r1=1701108&r2=1701109&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/Primary.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/Primary.h Thu Sep  3 18:59:05 2015
@@ -44,8 +44,6 @@ class Connection;
 class ConnectionObserver;
 class BrokerObserver;
 class SessionHandlerObserver;
-class TxBuffer;
-class DtxBuffer;
 }
 
 namespace sys {
@@ -58,7 +56,6 @@ class ReplicatingSubscription;
 class RemoteBackup;
 class QueueGuard;
 class Membership;
-class PrimaryTxObserver;
 
 /**
  * State associated with a primary broker:
@@ -87,25 +84,12 @@ class Primary : public Role
 
     void readyReplica(const ReplicatingSubscription&);
     void addReplica(ReplicatingSubscription&);
-    void removeReplica(const ReplicatingSubscription&);
-
-    /** Skip replication of ids to queue on backup. */
-    void skipEnqueues(const types::Uuid& backup,
-                      const boost::shared_ptr<broker::Queue>& queue,
-                      const ReplicationIdSet& ids);
-
-    /** Skip replication of dequeue of ids to queue on backup. */
-    void skipDequeues(const types::Uuid& backup,
-                      const boost::shared_ptr<broker::Queue>& queue,
-                      const ReplicationIdSet& ids);
 
     // Called via BrokerObserver
     void queueCreate(const QueuePtr&);
     void queueDestroy(const QueuePtr&);
     void exchangeCreate(const ExchangePtr&);
     void exchangeDestroy(const ExchangePtr&);
-    void startTx(const boost::intrusive_ptr<broker::TxBuffer>&);
-    void startDtx(const boost::intrusive_ptr<broker::DtxBuffer>&);
 
     // Called via ConnectionObserver
     void opened(broker::Connection& connection);
@@ -126,9 +110,6 @@ class Primary : public Role
     typedef sys::unordered_map<UuidQueue, ReplicatingSubscription*,
                                Hasher<UuidQueue> > ReplicaMap;
 
-    // Map of PrimaryTxObservers by tx-queue name
-    typedef sys::unordered_map<std::string, boost::weak_ptr<PrimaryTxObserver> >
TxMap;
-
     RemoteBackupPtr backupConnect(const BrokerInfo&, broker::Connection&, sys::Mutex::ScopedLock&);
     void backupDisconnect(RemoteBackupPtr, sys::Mutex::ScopedLock&);
 
@@ -136,8 +117,6 @@ class Primary : public Role
     void checkReady(RemoteBackupPtr);
     void setCatchupQueues(const RemoteBackupPtr&, bool createGuards);
     void deduplicate();
-    boost::shared_ptr<PrimaryTxObserver> makeTxObserver(
-        const boost::intrusive_ptr<broker::TxBuffer>&);
 
     mutable sys::Mutex lock;
     HaBroker& haBroker;
@@ -161,7 +140,6 @@ class Primary : public Role
     boost::shared_ptr<broker::SessionHandlerObserver> sessionHandlerObserver;
     boost::intrusive_ptr<sys::TimerTask> timerTask;
     ReplicaMap replicas;
-    TxMap txMap;
     PrimaryQueueLimits queueLimits;
 };
 }} // namespace qpid::ha

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.cpp?rev=1701109&r1=1701108&r2=1701109&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/QueueReplicator.cpp Thu Sep  3 18:59:05 2015
@@ -82,28 +82,27 @@ void QueueReplicator::copy(ExchangeRegis
 class QueueReplicator::ErrorListener : public SessionHandler::ErrorListener {
   public:
     ErrorListener(const boost::shared_ptr<QueueReplicator>& qr)
-        : queueReplicator(qr), logPrefix(qr->logPrefix) {}
+        : queueReplicator(qr), logPrefix(qr->logPrefix.prePrefix, qr->logPrefix.get())
{}
 
     void connectionException(framing::connection::CloseCode code, const std::string&
msg) {
-        QPID_LOG(debug, logPrefix << framing::createConnectionException(code, msg).what());
+        QPID_LOG(error, logPrefix << "Outgoing " << framing::createConnectionException(code,
msg).what());
     }
     void channelException(framing::session::DetachCode code, const std::string& msg)
{
-        QPID_LOG(debug, logPrefix << framing::createChannelException(code, msg).what());
+        QPID_LOG(error, logPrefix << "Outgoing " << framing::createChannelException(code,
msg).what());
     }
     void executionException(framing::execution::ErrorCode code, const std::string& msg)
{
-        QPID_LOG(debug, logPrefix << framing::createSessionException(code, msg).what());
+        QPID_LOG(error, logPrefix  << "Outgoing " << framing::createSessionException(code,
msg).what());
     }
     void incomingExecutionException(ErrorCode code, const std::string& msg) {
         boost::shared_ptr<QueueReplicator> qr = queueReplicator.lock();
-        if (qr && !qr->deletedOnPrimary(code, msg))
-            QPID_LOG(error, logPrefix << "Incoming "
-                     << framing::createSessionException(code, msg).what());
+        if (!(qr && qr->deletedOnPrimary(code, msg)))
+            QPID_LOG(error, logPrefix << "Incoming " << framing::createSessionException(code,
msg).what());
     }
     void detach() {}
 
   private:
     boost::weak_ptr<QueueReplicator> queueReplicator;
-    const LogPrefix& logPrefix;
+    LogPrefix2 logPrefix;
 };
 
 class QueueReplicator::QueueObserver : public broker::QueueObserver {

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/RemoteBackup.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/RemoteBackup.cpp?rev=1701109&r1=1701108&r2=1701109&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/RemoteBackup.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/RemoteBackup.cpp Thu Sep  3 18:59:05 2015
@@ -20,7 +20,6 @@
  */
 #include "RemoteBackup.h"
 #include "QueueGuard.h"
-#include "TxReplicator.h"
 #include "qpid/broker/Broker.h"
 #include "qpid/broker/Connection.h"
 #include "qpid/broker/Queue.h"

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp?rev=1701109&r1=1701108&r2=1701109&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.cpp Thu Sep  3 18:59:05 2015
@@ -24,7 +24,6 @@
 #include "QueueGuard.h"
 #include "QueueSnapshot.h"
 #include "ReplicatingSubscription.h"
-#include "TxReplicatingSubscription.h"
 #include "Primary.h"
 #include "HaBroker.h"
 #include "qpid/assert.h"
@@ -52,7 +51,6 @@ const string ReplicatingSubscription::QP
 const string ReplicatingSubscription::QPID_BROKER_INFO(QPID_HA+"info");
 const string ReplicatingSubscription::QPID_ID_SET(QPID_HA+"ids");
 const string ReplicatingSubscription::QPID_QUEUE_REPLICATOR(QPID_HA+"qrep");
-const string ReplicatingSubscription::QPID_TX_REPLICATOR(QPID_HA+"txrep");
 
 /* Called by SemanticState::consume to create a consumer */
 boost::shared_ptr<broker::SemanticState::ConsumerImpl>
@@ -76,12 +74,6 @@ ReplicatingSubscription::Factory::create
                      parent, name, queue, ack, acquire, exclusive, tag,
                      resumeId, resumeTtl, arguments));
     }
-    else if (type == QPID_TX_REPLICATOR) {
-        rs.reset(new TxReplicatingSubscription(
-                     haBroker,
-                     parent, name, queue, ack, acquire, exclusive, tag,
-                     resumeId, resumeTtl, arguments));
-    }
     if (rs) rs->initialize();
     return rs;
 }
@@ -254,7 +246,6 @@ void ReplicatingSubscription::cancel()
         cancelled = true;
     }
     QPID_LOG(debug, logPrefix << "Cancelled");
-    if (primary) primary->removeReplica(*this);
     getQueue()->getObservers().remove(
         boost::dynamic_pointer_cast<ReplicatingSubscription>(shared_from_this()));
     guard->cancel();
@@ -280,8 +271,6 @@ void ReplicatingSubscription::acknowledg
 void ReplicatingSubscription::sendDequeueEvent(Mutex::ScopedLock& l)
 {
     ReplicationIdSet oldDequeues = dequeues;
-    dequeues -= skipDequeue;    // Don't send skipped dequeues
-    skipDequeue -= oldDequeues; // Forget dequeues that would have been sent.
     if (dequeues.empty()) return;
     QPID_LOG(trace, logPrefix << "Sending dequeues " << dequeues);
     sendEvent(DequeueEvent(dequeues), l);
@@ -333,14 +322,4 @@ bool ReplicatingSubscription::doDispatch
     }
 }
 
-void ReplicatingSubscription::skipEnqueues(const ReplicationIdSet& ids) {
-    Mutex::ScopedLock l(lock);
-    skipEnqueue += ids;
-}
-
-void ReplicatingSubscription::skipDequeues(const ReplicationIdSet& ids) {
-    Mutex::ScopedLock l(lock);
-    skipDequeue += ids;
-}
-
 }} // namespace qpid::ha

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h?rev=1701109&r1=1701108&r2=1701109&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/ReplicatingSubscription.h Thu Sep  3 18:59:05 2015
@@ -97,7 +97,6 @@ class ReplicatingSubscription :
     static const std::string QPID_ID_SET;
     // Replicator types: argument values for QPID_REPLICATING_SUBSCRIPTION argument.
     static const std::string QPID_QUEUE_REPLICATOR;
-    static const std::string QPID_TX_REPLICATOR;
 
     ReplicatingSubscription(HaBroker& haBroker,
                             broker::SemanticState* parent,
@@ -138,9 +137,6 @@ class ReplicatingSubscription :
 
     BrokerInfo getBrokerInfo() const { return info; }
 
-    void skipEnqueues(const ReplicationIdSet& ids);
-    void skipDequeues(const ReplicationIdSet& ids);
-
   protected:
     bool doDispatch();
 
@@ -148,8 +144,7 @@ class ReplicatingSubscription :
     LogPrefix2 logPrefix;
     QueuePosition position;
     ReplicationIdSet dequeues;  // Dequeues to be sent in next dequeue event.
-    ReplicationIdSet skipEnqueue; // Enqueues to skip: messages already on backup and tx
enqueues.
-    ReplicationIdSet skipDequeue; // Dequeues to skip: tx dequeues.
+    ReplicationIdSet skipEnqueue; // Enqueues to skip: messages already on backup.
     ReplicationIdSet unready;   // Unguarded, replicated and un-acknowledged.
     bool wasStopped;
     bool ready;

Modified: qpid/trunk/qpid/cpp/src/qpid/ha/types.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/ha/types.cpp?rev=1701109&r1=1701108&r2=1701109&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/ha/types.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/ha/types.cpp Thu Sep  3 18:59:05 2015
@@ -39,7 +39,6 @@ const string QPID_HA_UUID("qpid.ha-uuid"
 
 const char* QPID_HA_PREFIX = "qpid.ha-";
 const char* QUEUE_REPLICATOR_PREFIX = "qpid.ha-q:";
-const char* TRANSACTION_REPLICATOR_PREFIX = "qpid.ha-tx:";
 
 bool startsWith(const string& name, const string& prefix) {
     return name.compare(0, prefix.size(), prefix) == 0;

Modified: qpid/trunk/qpid/cpp/src/qpid/sys/SocketTransport.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/SocketTransport.cpp?rev=1701109&r1=1701108&r2=1701109&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/SocketTransport.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/SocketTransport.cpp Thu Sep  3 18:59:05 2015
@@ -48,7 +48,7 @@ namespace {
     {
         if (opts.tcpNoDelay) {
             s.setTcpNoDelay();
-            QPID_LOG(info, "Set TCP_NODELAY on connection to " << s.getPeerAddress());
+            QPID_LOG(debug, "Set TCP_NODELAY on connection to " << s.getPeerAddress());
         }
 
         AsynchIO* aio = AsynchIO::create

Modified: qpid/trunk/qpid/cpp/src/tests/ha_test.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/ha_test.py?rev=1701109&r1=1701108&r2=1701109&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/ha_test.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/ha_test.py Thu Sep  3 18:59:05 2015
@@ -267,6 +267,8 @@ acl allow all all
         c = self.connect_admin()
         try:
             wait_address(c, queue)
+            if not "msg" in kwargs:
+                kwargs["msg"]=str(self)
             assert_browse_retry(c.session(), queue, expected, **kwargs)
         finally: c.close()
 

Modified: qpid/trunk/qpid/cpp/src/tests/ha_tests.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/ha_tests.py?rev=1701109&r1=1701108&r2=1701109&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/ha_tests.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/ha_tests.py Thu Sep  3 18:59:05 2015
@@ -1327,28 +1327,25 @@ class TransactionTests(HaBrokerTest):
         sb.close()
         return tx
 
-    def tx_subscriptions(self, broker):
-        """Return list of queue names for tx subscriptions"""
-        return [q for q in broker.agent.repsub_queues()
-                    if q.startswith("qpid.ha-tx")]
-
     def test_tx_simple_commit(self):
         cluster = HaCluster(self, 2, test_store=True, wait=True)
         tx = self.tx_simple_setup(cluster)
         tx.sync()
-        tx_queues = cluster[0].agent.tx_queues()
-
-        # NOTE: backup does not process transactional dequeues until prepare
-        cluster[1].assert_browse_backup("a", ["x","y","z"])
-        cluster[1].assert_browse_backup("b", ['0', '1', '2'])
-
         tx.acknowledge()
+        # Pre transaction - messages are acquired on primary but not yet dequeued
+        # so still there on backup.
+        cluster[0].assert_browse_backup("a", [])
+        cluster[1].assert_browse_backup("a", ['x', 'y', 'z'])
+        for b in cluster:
+            b.assert_browse_backup("b", ['0', '1', '2'])
         tx.commit()
         tx.sync()
         tx.close()
 
+        # Post transaction: all synced.
         for b in cluster:
-            self.assert_simple_commit_outcome(b, tx_queues)
+            b.assert_browse_backup("a", [])
+            b.assert_browse_backup("b", ['0', '1', '2', "x","y","z"])
 
         # Verify non-tx dequeue is replicated correctly
         c = cluster.connect(0, protocol=self.tx_protocol)
@@ -1360,121 +1357,22 @@ class TransactionTests(HaBrokerTest):
         c.close()
         tx.connection.close()
 
-
-    def check_enq_deq(self, cluster, queue, expect):
-        for b in cluster:
-            q = b.agent.getQueue(queue)
-            self.assertEqual(
-                (b.name,)+expect,
-                (b.name, q.msgTotalEnqueues, q.msgTotalDequeues, q.msgTxnEnqueues, q.msgTxnDequeues))
-
-    def test_tx_enq_notx_deq(self):
-        """Verify that a non-tx dequeue of a tx enqueue is replicated correctly"""
-        cluster = HaCluster(self, 2, test_store=True)
-        c = cluster.connect(0, protocol=self.tx_protocol)
-
-        tx = c.session(transactional=True)
-        c.session().sender("qq;{create:always}").send("m1")
-        tx.sender("qq;{create:always}").send("tx")
-        tx.commit()
-        tx.close()
-        c.session().sender("qq;{create:always}").send("m2")
-        self.check_enq_deq(cluster, 'qq', (3, 0, 1, 0))
-
-        notx = c.session()
-        self.assertEqual(['m1', 'tx', 'm2'], [m.content for m in receiver_iter(notx.receiver('qq'))])
-        notx.acknowledge()
-        self.check_enq_deq(cluster, 'qq', (3, 3, 1, 0))
-        for b in cluster: b.assert_browse_backup('qq', [], msg=b)
-        for b in cluster: self.assert_tx_clean(b)
-
-    def test_tx_enq_notx_deq_qpid_send(self):
-        """Verify that a non-tx dequeue of a tx enqueue is replicated correctly"""
-        cluster = HaCluster(self, 2, test_store=True)
-
-        self.popen(
-            ['qpid-send', '-a', 'qq;{create:always}', '-b', cluster[0].host_port(), '--tx=1',
-             '--content-string=foo']
-        ).assert_exit_ok()
-        for b in cluster: b.assert_browse_backup('qq', ['foo'], msg=b)
-        self.check_enq_deq(cluster, 'qq', (1, 0, 1, 0))
-
-        self.popen(['qpid-receive', '-a', 'qq', '-b', cluster[0].host_port()]).assert_exit_ok()
-        self.check_enq_deq(cluster, 'qq', (1, 1, 1, 0))
-        for b in cluster: b.assert_browse_backup('qq', [], msg=b)
-        for b in cluster: self.assert_tx_clean(b)
-
-    def assert_tx_clean(self, b):
-        """Verify that there are no transaction artifacts
-        (exchanges, queues, subscriptions) on b."""
-        class FunctionCache:    # Call a function and cache the result.
-            def __init__(self, f): self.f, self.value = f, None
-            def __call__(self): self.value = self.f(); return self.value
-
-        txq= FunctionCache(b.agent.tx_queues)
-        assert retry(lambda: not txq()), "%s: unexpected %s"%(b, txq.value)
-        txsub = FunctionCache(lambda: self.tx_subscriptions(b))
-        assert retry(lambda: not txsub()), "%s: unexpected %s"%(b, txsub.value)
-        # TODO aconway 2013-10-15: TX exchanges don't show up in management.
-
-    def assert_simple_commit_outcome(self, b, tx_queues):
-        b.assert_browse_backup("a", [], msg=b)
-        b.assert_browse_backup("b", ['0', '1', '2', 'x', 'y', 'z'], msg=b)
-        # Check for expected actions on the store
-        expect = """<enqueue a x>
-<enqueue a y>
-<enqueue a z>
-<begin tx 1>
-<dequeue a x tx=1>
-<dequeue a y tx=1>
-<dequeue a z tx=1>
-<commit tx=1>
-"""
-        self.assertEqual(expect, open_read(b.store_log), msg=b)
-        self.assert_tx_clean(b)
-
     def test_tx_simple_rollback(self):
         cluster = HaCluster(self, 2, test_store=True)
         tx = self.tx_simple_setup(cluster)
         tx.sync()
-        tx_queues = cluster[0].agent.tx_queues()
         tx.acknowledge()
         tx.rollback()
-        tx.close()              # For clean test.
-        for b in cluster: self.assert_simple_rollback_outcome(b, tx_queues)
+
+        for b in cluster:
+            b.assert_browse_backup("a", ["x","y","z"])
+            b.assert_browse_backup("b", ['0', '1', '2'])
+
+        tx.close()
         tx.connection.close()
 
-    def assert_simple_rollback_outcome(self, b, tx_queues):
-        b.assert_browse_backup("a", ["x","y","z"], msg=b)
-        b.assert_browse_backup("b", ['0', '1', '2'], msg=b)
-        # Check for expected actions on the store
-        expect = """<enqueue a x>
-<enqueue a y>
-<enqueue a z>
-"""
-        self.assertEqual(open_read(b.store_log), expect, msg=b)
-        self.assert_tx_clean(b)
 
     def test_tx_simple_failure(self):
-        """Verify we throw TransactionAborted if there is a store error during a transaction"""
-        cluster = HaCluster(self, 3, test_store=True)
-        tx = self.tx_simple_setup(cluster)
-        tx.sync()
-        tx_queues = cluster[0].agent.tx_queues()
-        tx.acknowledge()
-        l = LogLevel(ERROR) # Hide expected WARNING log messages from failover.
-        try:
-            cluster.bounce(0)       # Should cause roll-back
-            tx.connection.session() # Wait for reconnect
-            for b in cluster: self.assert_simple_rollback_outcome(b, tx_queues)
-            self.assertRaises(qm.TransactionAborted, tx.sync)
-            self.assertRaises(qm.TransactionAborted, tx.commit)
-            try: tx.connection.close()
-            except qm.TransactionAborted: pass # Occasionally get exception on close.
-            for b in cluster: self.assert_simple_rollback_outcome(b, tx_queues)
-        finally: l.restore()
-
-    def test_tx_simple_failover(self):
         """Verify we throw TransactionAborted if there is a fail-over during a transaction"""
         cluster = HaCluster(self, 3, test_store=True)
         tx = self.tx_simple_setup(cluster)
@@ -1485,79 +1383,15 @@ class TransactionTests(HaBrokerTest):
         try:
             cluster.bounce(0)       # Should cause roll-back
             tx.connection.session() # Wait for reconnect
-            for b in cluster: self.assert_simple_rollback_outcome(b, tx_queues)
             self.assertRaises(qm.TransactionAborted, tx.sync)
             self.assertRaises(qm.TransactionAborted, tx.commit)
             try: tx.connection.close()
             except qm.TransactionAborted: pass # Occasionally get exception on close.
-            for b in cluster: self.assert_simple_rollback_outcome(b, tx_queues)
-        finally: l.restore()
-
-    def test_tx_unknown_failover(self):
-        """Verify we throw TransactionUnknown if there is a failure during commit"""
-        cluster = HaCluster(self, 3, test_store=True)
-        tx = self.tx_simple_setup(cluster)
-        tx.sync()
-        tx_queues = cluster[0].agent.tx_queues()
-        tx.acknowledge()
-        l = LogLevel(ERROR) # Hide expected WARNING log messages from failover.
-        try:
-            os.kill(cluster[2].pid, signal.SIGSTOP) # Delay prepare response
-            class CommitThread(Thread):
-                def run(self):
-                    try: tx.commit()
-                    except Exception, e:
-                        self.error = e
-            t = CommitThread()
-            t.start()            # Commit in progress
-            t.join(timeout=0.01)
-            self.assertTrue(t.isAlive())
-            cluster.bounce(0)
-            os.kill(cluster[2].pid, signal.SIGCONT)
-            t.join()
-            try: raise t.error
-            except qm.TransactionUnknown: pass
-            for b in cluster: self.assert_tx_clean(b)
-            try: tx.connection.close()
-            except qm.TransactionUnknown: pass # Occasionally get exception on close.
+            for b in cluster:
+                b.assert_browse_backup("a", ["x","y","z"])
+                b.assert_browse_backup("b", ['0', '1', '2'])
         finally: l.restore()
 
-    def test_tx_no_backups(self):
-        """Test the special case of a TX where there are no backups"""
-
-        # Test commit
-        cluster = HaCluster(self, 1, test_store=True)
-        tx = self.tx_simple_setup(cluster)
-        tx.acknowledge()
-        tx.commit()
-        tx.sync()
-        tx_queues = cluster[0].agent.tx_queues()
-        tx.close()
-        self.assert_simple_commit_outcome(cluster[0], tx_queues)
-
-        # Test rollback
-        cluster = HaCluster(self, 1, test_store=True)
-        tx = self.tx_simple_setup(cluster)
-        tx.sync()
-        tx_queues = cluster[0].agent.tx_queues()
-        tx.acknowledge()
-        tx.rollback()
-        tx.sync()
-        tx.close()
-        self.assert_simple_rollback_outcome(cluster[0], tx_queues)
-
-    def test_tx_backup_fail(self):
-        cluster = HaCluster(self, 2, test_store=True, s_args=[[],["--test-store-name=bang"]])
-        c = cluster[0].connect(protocol=self.tx_protocol)
-        tx = c.session(transactional=True)
-        s = tx.sender("q;{create:always,node:{durable:true}}")
-        for m in ["foo","TEST_STORE_DO bang: throw","bar"]: s.send(qm.Message(m, durable=True))
-        def commit_sync(): tx.commit(); tx.sync()
-        self.assertRaises(qm.TransactionAborted, commit_sync)
-        for b in cluster: b.assert_browse_backup("q", [])
-        self.assertEqual(open_read(cluster[0].store_log), "<begin tx 1>\n<enqueue
q foo tx=1>\n<enqueue q TEST_STORE_DO bang: throw tx=1>\n<enqueue q bar tx=1>\n<abort
tx=1>\n")
-        self.assertEqual(open_read(cluster[1].store_log), "<begin tx 1>\n<enqueue
q foo tx=1>\n<enqueue q TEST_STORE_DO bang: throw tx=1>\n<abort tx=1>\n")
-
     def test_tx_join_leave(self):
         """Test cluster members joining/leaving cluster.
         Also check that tx-queues are cleaned up at end of transaction."""
@@ -1568,13 +1402,11 @@ class TransactionTests(HaBrokerTest):
         tx = cluster[0].connect(protocol=self.tx_protocol).session(transactional=True)
         s = tx.sender("q;{create:always}")
         s.send("a", sync=True)
-        self.assertEqual([1,1,1], [len(b.agent.tx_queues()) for b in cluster])
         cluster[1].kill(final=False)
         s.send("b")
         tx.commit()
         tx.connection.close()
         for b in [cluster[0],cluster[2]]:
-            self.assert_tx_clean(b)
             b.assert_browse_backup("q", ["a","b"], msg=b)
         # Joining
         tx = cluster[0].connect(protocol=self.tx_protocol).session(transactional=True)
@@ -1583,7 +1415,6 @@ class TransactionTests(HaBrokerTest):
         cluster.restart(1)      # Not a part of the current transaction.
         tx.commit()
         tx.connection.close()
-        for b in cluster: self.assert_tx_clean(b)
         # The new member is not in the tx but  receives the results normal replication.
         for b in cluster: b.assert_browse_backup("q", ["a", "b", "foo"], msg=b)
 
@@ -1596,7 +1427,6 @@ class TransactionTests(HaBrokerTest):
         for s in sessions:
             sn = s.sender("qq;{create:always,node:{durable:true}}")
             sn.send(qm.Message("foo", durable=True))
-        self.assertEqual(n, len(cluster[1].agent.tx_queues()))
         threads = [ Thread(target=s.commit) for s in sessions]
         for t in threads: t.start()
         cluster[0].ready(timeout=1) # Check for deadlock



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message