qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acon...@apache.org
Subject svn commit: r1163347 - in /qpid/trunk/qpid/cpp: src/qpid/broker/ src/qpid/cluster/ src/tests/ xml/
Date Tue, 30 Aug 2011 19:35:36 GMT
Author: aconway
Date: Tue Aug 30 19:35:36 2011
New Revision: 1163347

URL: http://svn.apache.org/viewvc?rev=1163347&view=rev
Log:
QPID-3384: DTX transactions - replicate suspended transactions.

Modified:
    qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h
    qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h
    qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
    qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.h
    qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateReceiver.h
    qpid/trunk/qpid/cpp/src/tests/brokertest.py
    qpid/trunk/qpid/cpp/src/tests/cluster_tests.py
    qpid/trunk/qpid/cpp/xml/cluster.xml

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h?rev=1163347&r1=1163346&r2=1163347&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.h Tue Aug 30 19:35:36 2011
@@ -148,9 +148,10 @@ class SemanticState : private boost::non
         management::Manageable::status_t ManagementMethod (uint32_t methodId, management::Args&
args, std::string& text);
     };
 
+    typedef std::map<std::string, DtxBuffer::shared_ptr> DtxBufferMap;
+
   private:
     typedef std::map<std::string, ConsumerImpl::shared_ptr> ConsumerImplMap;
-    typedef std::map<std::string, DtxBuffer::shared_ptr> DtxBufferMap;
 
     SessionContext& session;
     DeliveryAdapter& deliveryAdapter;
@@ -181,6 +182,7 @@ class SemanticState : private boost::non
     void disable(ConsumerImpl::shared_ptr);
 
   public:
+
     SemanticState(DeliveryAdapter&, SessionContext&);
     ~SemanticState();
 
@@ -218,6 +220,7 @@ class SemanticState : private boost::non
     void commit(MessageStore* const store);
     void rollback();
     void selectDtx();
+    bool getDtxSelected() const { return dtxSelected; }
     void startDtx(const std::string& xid, DtxManager& mgr, bool join);
     void endDtx(const std::string& xid, bool fail);
     void suspendDtx(const std::string& xid);
@@ -249,6 +252,7 @@ class SemanticState : private boost::non
     void setDtxBuffer(const DtxBuffer::shared_ptr& dtxb) { dtxBuffer = dtxb; txBuffer
= dtxb; }
     void setAccumulatedAck(const framing::SequenceSet& s) { accumulatedAck = s; }
     void record(const DeliveryRecord& delivery);
+    DtxBufferMap& getSuspendedXids() { return suspendedXids; }
 };
 
 }} // namespace qpid::broker

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp?rev=1163347&r1=1163346&r2=1163347&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.cpp Tue Aug 30 19:35:36 2011
@@ -419,7 +419,8 @@ void Connection::sessionState(
     const SequenceNumber& expected,
     const SequenceNumber& received,
     const SequenceSet& unknownCompleted,
-    const SequenceSet& receivedIncomplete)
+    const SequenceSet& receivedIncomplete,
+    bool dtxSelected)
 {
     sessionState().setState(
         replayStart,
@@ -429,7 +430,9 @@ void Connection::sessionState(
         received,
         unknownCompleted,
         receivedIncomplete);
-    QPID_LOG(debug, cluster << " received session state update for " << sessionState().getId());
+    if (dtxSelected) semanticState().selectDtx();
+    QPID_LOG(debug, cluster << " received session state update for "
+             << sessionState().getId());
     // The output tasks will be added later in the update process.
     connection->getOutputTasks().removeAll();
 }
@@ -459,11 +462,14 @@ void Connection::shadowReady(
     output.setSendMax(sendMax);
 }
 
-void Connection::setDtxBuffer(const UpdateReceiver::DtxBuffers::value_type &v) {
+void Connection::setDtxBuffer(const UpdateReceiver::DtxBufferRef& bufRef) {
     broker::DtxManager& mgr = cluster.getBroker().getDtxManager();
-    broker::DtxWorkRecord* record = mgr.getWork(v.first.first); // XID
-    uint32_t index = v.first.second; // Index
-    v.second->setDtxBuffer((*record)[index]);
+    broker::DtxWorkRecord* record = mgr.getWork(bufRef.xid);
+    broker::DtxBuffer::shared_ptr buffer = (*record)[bufRef.index];
+    if (bufRef.suspended)
+        bufRef.semanticState->getSuspendedXids()[bufRef.xid] = buffer;
+    else
+        bufRef.semanticState->setDtxBuffer(buffer);
 }
 
 // Marks the end of the update.
@@ -694,11 +700,12 @@ void Connection::dtxAck() {
     dtxAckRecords.clear();
 }
 
-void Connection::dtxBufferRef(const std::string& xid, uint32_t index) {
-    // Save the association between DtxBuffer and session so we can
-    // set the DtxBuffer on the session at the end of the update
-    // when the DtxManager has been replicated.
-    updateIn.dtxBuffers[std::make_pair(xid, index)] = &semanticState();
+void Connection::dtxBufferRef(const std::string& xid, uint32_t index, bool suspended)
{
+    // Save the association between DtxBuffers and the session so we
+    // can set the DtxBuffers at the end of the update when the
+    // DtxManager has been replicated.
+    updateIn.dtxBuffers.push_back(
+        UpdateReceiver::DtxBufferRef(xid, index, suspended, &semanticState()));
 }
 
 // Sent at end of work record.

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h?rev=1163347&r1=1163346&r2=1163347&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Connection.h Tue Aug 30 19:35:36 2011
@@ -124,7 +124,8 @@ class Connection :
                       const framing::SequenceNumber& expected,
                       const framing::SequenceNumber& received,
                       const framing::SequenceSet& unknownCompleted,
-                      const SequenceSet& receivedIncomplete);
+                      const SequenceSet& receivedIncomplete,
+                      bool dtxSelected);
 
     void outputTask(uint16_t channel, const std::string& name);
 
@@ -173,7 +174,7 @@ class Connection :
                   bool expired);
     void dtxEnd();
     void dtxAck();
-    void dtxBufferRef(const std::string& xid, uint32_t index);
+    void dtxBufferRef(const std::string& xid, uint32_t index, bool suspended);
     void dtxWorkRecord(const std::string& xid, bool prepared, uint32_t timeout);
 
     // Encoded exchange replication.

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp?rev=1163347&r1=1163346&r2=1163347&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.cpp Tue Aug 30 19:35:36 2011
@@ -506,7 +506,8 @@ void UpdateClient::updateSession(broker:
         std::max(received, ss->receiverGetExpected().command),
         received,
         ss->receiverGetUnknownComplete(),
-        ss->receiverGetIncomplete()
+        ss->receiverGetIncomplete(),
+        ss->getSemanticState().getDtxSelected()
     );
 
     // Send frames for partial message in progress.
@@ -552,7 +553,6 @@ void UpdateClient::updateUnacked(const b
                                  client::AsyncSession& updateSession)
 {
     if (!dr.isEnded() && dr.isAcquired()) {
-        // FIXME aconway 2011-08-19: should this be assert or if?
         assert(dr.getMessage().payload);
         // If the message is acquired then it is no longer on the
         // updatees queue, put it on the update queue for updatee to pick up.
@@ -621,22 +621,34 @@ class TxOpUpdater : public broker::TxOpC
     ClusterConnectionProxy proxy;
 };
 
+void UpdateClient::updateBufferRef(const broker::DtxBuffer::shared_ptr& dtx,bool suspended)
+{
+    ClusterConnectionProxy proxy(shadowSession);
+    broker::DtxWorkRecord* record =
+        updaterBroker.getDtxManager().getWork(dtx->getXid());
+    proxy.dtxBufferRef(dtx->getXid(), record->indexOf(dtx), suspended);
+
+}
+
 void UpdateClient::updateTransactionState(broker::SemanticState& s) {
-    broker::TxBuffer::shared_ptr tx = s.getTxBuffer();
-    broker::DtxBuffer::shared_ptr dtx = s.getDtxBuffer();
     ClusterConnectionProxy proxy(shadowSession);
     proxy.accumulatedAck(s.getAccumulatedAck());
+    broker::TxBuffer::shared_ptr tx = s.getTxBuffer();
+    broker::DtxBuffer::shared_ptr dtx = s.getDtxBuffer();
     if (dtx) {
-        broker::DtxWorkRecord* record =
-            updaterBroker.getDtxManager().getWork(dtx->getXid()); // throws if not found
-        proxy.dtxBufferRef(dtx->getXid(), record->indexOf(dtx));
+        updateBufferRef(dtx, false); // Current transaction.
     } else if (tx) {
-        ClusterConnectionProxy proxy(shadowSession);
         proxy.txStart();
         TxOpUpdater updater(*this, shadowSession, expiry);
         tx->accept(updater);
         proxy.txEnd();
     }
+    for (SemanticState::DtxBufferMap::iterator i = s.getSuspendedXids().begin();
+         i != s.getSuspendedXids().end();
+         ++i)
+    {
+        updateBufferRef(i->second, true);
+    }
 }
 
 void UpdateClient::updateDtxBuffer(const broker::DtxBuffer::shared_ptr& dtx) {

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.h?rev=1163347&r1=1163346&r2=1163347&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateClient.h Tue Aug 30 19:35:36 2011
@@ -100,6 +100,7 @@ class UpdateClient : public sys::Runnabl
     void updateBinding(client::AsyncSession&, const std::string& queue, const broker::QueueBinding&
binding);
     void updateConnection(const boost::intrusive_ptr<Connection>& connection);
     void updateSession(broker::SessionHandler& s);
+    void updateBufferRef(const broker::DtxBuffer::shared_ptr& dtx, bool suspended);
     void updateTransactionState(broker::SemanticState& s);
     void updateOutputTask(const sys::OutputTask* task);
     void updateConsumer(const broker::SemanticState::ConsumerImpl::shared_ptr&);

Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateReceiver.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateReceiver.h?rev=1163347&r1=1163346&r2=1163347&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateReceiver.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/UpdateReceiver.h Tue Aug 30 19:35:36 2011
@@ -40,11 +40,18 @@ class UpdateReceiver {
     /** Management-id for the next shadow connection */
     std::string nextShadowMgmtId;
 
-    /** Relationship between DtxBuffers, identified by xid, index in DtxManager,
-     * and sessions represented by their SemanticState.
+    /** Record the position of a DtxBuffer in the DtxManager (xid + index)
+     * and the association with a session, either suspended or current.
      */
-    typedef std::pair<std::string, uint32_t> DtxBufferRef;
-    typedef std::map<DtxBufferRef, broker::SemanticState* > DtxBuffers;
+    struct DtxBufferRef {
+        std::string xid;
+        uint32_t index;         // Index in WorkRecord in DtxManager
+        bool suspended;         // Is this a suspended or current transaction?
+        broker::SemanticState* semanticState; // Associated session
+        DtxBufferRef(const std::string& x, uint32_t i, bool s, broker::SemanticState*
ss)
+            : xid(x), index(i), suspended(s), semanticState(ss) {}
+    };
+    typedef std::vector<DtxBufferRef> DtxBuffers;
     DtxBuffers dtxBuffers;
 };
 }} // namespace qpid::cluster

Modified: qpid/trunk/qpid/cpp/src/tests/brokertest.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/brokertest.py?rev=1163347&r1=1163346&r2=1163347&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/brokertest.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/brokertest.py Tue Aug 30 19:35:36 2011
@@ -420,6 +420,9 @@ class Cluster:
         self._brokers.append(self.test.broker(self.args+args, name, expect, wait, port=port,
show_cmd=show_cmd))
         return self._brokers[-1]
 
+    def ready(self):
+        for b in self: b.ready()
+
     def start_n(self, count, expect=EXPECT_RUNNING, wait=True, args=[], show_cmd=False):
         for i in range(count): self.start(expect=expect, wait=wait, args=args, show_cmd=show_cmd)
 
@@ -495,6 +498,7 @@ class BrokerTest(TestCase):
     def browse(self, session, queue, timeout=0):
         """Return a list with the contents of each message on queue."""
         r = session.receiver("%s;{mode:browse}"%(queue))
+        r.capacity = 100
         try:
             contents = []
             try:

Modified: qpid/trunk/qpid/cpp/src/tests/cluster_tests.py
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/tests/cluster_tests.py?rev=1163347&r1=1163346&r2=1163347&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/tests/cluster_tests.py (original)
+++ qpid/trunk/qpid/cpp/src/tests/cluster_tests.py Tue Aug 30 19:35:36 2011
@@ -733,11 +733,11 @@ class DtxTestFixture:
         self.session.dtx_select()
         self.consumer = None
 
-    def start(self):
-        self.test.assertEqual(XA_OK, self.session.dtx_start(xid=self.xid).status)
+    def start(self, resume=False):
+        self.test.assertEqual(XA_OK, self.session.dtx_start(xid=self.xid, resume=resume).status)
 
-    def end(self):
-        self.test.assertEqual(XA_OK, self.session.dtx_end(xid=self.xid).status)
+    def end(self, suspend=False):
+        self.test.assertEqual(XA_OK, self.session.dtx_end(xid=self.xid, suspend=suspend).status)
 
     def prepare(self):
         self.test.assertEqual(XA_OK, self.session.dtx_prepare(xid=self.xid).status)
@@ -767,10 +767,9 @@ class DtxTestFixture:
         return msg
 
 
-    def verify(self, cluster, messages):
-        for b in cluster:
-            self.test.assert_browse(b.connect().session(), self.name, messages)
-
+    def verify(self, sessions, messages):
+        for s in sessions:
+            self.test.assert_browse(s, self.name, messages)
 
 class DtxTests(BrokerTest):
 
@@ -783,12 +782,13 @@ class DtxTests(BrokerTest):
         # multiple brokers per test.
 
         cluster=self.cluster(1)
+        sessions = [cluster[0].connect().session()] # For verify
 
         # Transaction that will be open when new member joins, then committed.
         t1 = DtxTestFixture(self, cluster[0], "t1")
         t1.start()
         t1.send(["1", "2"])
-        t1.verify(cluster, [])          # Not visible outside of transaction
+        t1.verify(sessions, [])          # Not visible outside of transaction
 
         # Transaction that will be open when  new member joins, then rolled back.
         t2 = DtxTestFixture(self, cluster[0], "t2")
@@ -801,7 +801,7 @@ class DtxTests(BrokerTest):
         t3.send(["1", "2"])
         t3.end()
         t3.prepare()
-        t1.verify(cluster, [])          # Not visible outside of transaction
+        t1.verify(sessions, [])          # Not visible outside of transaction
 
         # Transaction that will be prepared when  new member joins, then rolled back.
         t4 = DtxTestFixture(self, cluster[0], "t4")
@@ -819,70 +819,99 @@ class DtxTests(BrokerTest):
         t6 = DtxTestFixture(self, cluster[0], "t6")
         t6.send(["a","b","c"])
         t6.start()
-        t6.verify(cluster, ["a","b","c"])
         self.assertEqual(t6.accept().body, "a");
-        t6.verify(cluster, ["b","c"])
 
         # Accept messages in a transaction before/after join then roll back
         t7 = DtxTestFixture(self, cluster[0], "t7")
         t7.send(["a","b","c"])
         t7.start()
-        t7.verify(cluster, ["a","b","c"])
         self.assertEqual(t7.accept().body, "a");
-        t7.verify(cluster, ["b","c"])
+
+        # Suspended transaction across join.
+        t8 = DtxTestFixture(self, cluster[0], "t8")
+        t8.start()
+        t8.send(["x"])
+        t8.end(suspend=True)
 
         # Start new member
         cluster.start()
+        sessions.append(cluster[1].connect().session())
 
         # Commit t1
         t1.send(["3","4"])
-        t1.verify(cluster, [])
+        t1.verify(sessions, [])
         t1.end()
         t1.commit(one_phase=True)
-        t1.verify(cluster, ["1","2","3","4"])
+        t1.verify(sessions, ["1","2","3","4"])
 
         # Rollback t2
         t2.send(["3","4"])
-        t2.verify(cluster, [])
         t2.end()
         t2.rollback()
-        t2.verify(cluster, [])
+        t2.verify(sessions, [])
 
         # Commit t3
-        t3.verify(cluster, [])
         t3.commit(one_phase=False)
-        t3.verify(cluster, ["1","2"])
+        t3.verify(sessions, ["1","2"])
 
         # Rollback t4
-        t4.verify(cluster, [])
         t4.rollback()
-        t4.verify(cluster, [])
+        t4.verify(sessions, [])
 
         # Commit t5
         t5.send(["3","4"])
-        t5.verify(cluster, [])
+        t5.verify(sessions, [])
         t5.end()
         t5.commit(one_phase=True)
-        t5.verify(cluster, ["1","2","3","4"])
+        t5.verify(sessions, ["1","2","3","4"])
 
-        # Commit t7
-        t6.verify(cluster, ["b", "c"])
+        # Commit t6
         self.assertEqual(t6.accept().body, "b");
-        t6.verify(cluster, ["c"])
+        t6.verify(sessions, ["c"])
         t6.end()
         t6.commit(one_phase=True)
-        t6.verify(cluster, ["c"])
         t6.session.close()              # Make sure they're not requeued by the session.
-        t6.verify(cluster, ["c"])
+        t6.verify(sessions, ["c"])
 
         # Rollback t7
-        t7.verify(cluster, ["b", "c"])
         self.assertEqual(t7.accept().body, "b");
-        t7.verify(cluster, ["c"])
         t7.end()
         t7.rollback()
-        t7.verify(cluster, ["a", "b", "c"])
+        t7.verify(sessions, ["a", "b", "c"])
 
+        # Resume t8
+        t8.start(resume=True)
+        t8.send(["y"])
+        t8.end()
+        t8.commit(one_phase=True)
+        t8.verify(sessions, ["x","y"])
+
+    def test_dtx_failover_rollback(self):
+       """Kill a broker during a transaction, verify we roll back correctly"""
+       cluster=self.cluster(1, expect=EXPECT_EXIT_FAIL)
+       cluster.start(expect=EXPECT_RUNNING)
+
+       # Test unprepared at crash
+       t1 = DtxTestFixture(self, cluster[0], "t1")
+       t1.send(["a"])                   # Not in transaction
+       t1.start()
+       t1.send(["b"])                   # In transaction
+
+       # Test prepared at crash
+       t2 = DtxTestFixture(self, cluster[0], "t2")
+       t2.send(["a"])                   # Not in transaction
+       t2.start()
+       t2.send(["b"])                   # In transaction
+       t2.end()
+       t2.prepare()
+
+       # Crash the broker
+       cluster[0].kill()
+
+       # Transactional changes should not appear
+       s = cluster[1].connect().session();
+       self.assert_browse(s, "t1", ["a"])
+       self.assert_browse(s, "t2", ["a"])
 
 class TxTests(BrokerTest):
 

Modified: qpid/trunk/qpid/cpp/xml/cluster.xml
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/xml/cluster.xml?rev=1163347&r1=1163346&r2=1163347&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/xml/cluster.xml (original)
+++ qpid/trunk/qpid/cpp/xml/cluster.xml Tue Aug 30 19:35:36 2011
@@ -227,6 +227,7 @@
     <control name="dtx-buffer-ref" code="0x1D">
       <field name="xid" type="str16"/>
       <field name="index" type="uint32"/>
+      <field name="suspended" type="bit"/>
     </control>
 
     <control name="dtx-work-record" code="0x1E">
@@ -246,6 +247,7 @@
       <field name="received" type="sequence-no"/>	       <!-- Received up to here
(>= expected) -->
       <field name="unknown-completed" type="sequence-set"/>    <!-- Completed but
not known to peer. -->
       <field name="received-incomplete" type="sequence-set"/>  <!-- Received and
incomplete -->
+      <field name="dtx-selected" type="bit"/>
     </control>
 
     <!-- Complete a shadow connection update. -->



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org


Mime
View raw message