qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From g...@apache.org
Subject svn commit: r545531 - in /incubator/qpid/trunk/qpid: cpp/src/ cpp/src/qpid/broker/ cpp/src/qpid/sys/ cpp/src/tests/ python/tests_0-9/
Date Fri, 08 Jun 2007 15:24:13 GMT
Author: gsim
Date: Fri Jun  8 08:24:12 2007
New Revision: 545531

URL: http://svn.apache.org/viewvc?view=rev&rev=545531
Log:
Timeout handling for dtx, plus tests.


Added:
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxTimeout.cpp   (with props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxTimeout.h   (with props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Timer.cpp   (with props)
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Timer.h   (with props)
    incubator/qpid/trunk/qpid/cpp/src/tests/TimerTest.cpp   (with props)
Modified:
    incubator/qpid/trunk/qpid/cpp/src/Makefile.am
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerChannel.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerChannel.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxBuffer.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxBuffer.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxHandlerImpl.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxManager.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxManager.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxWorkRecord.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxWorkRecord.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Time.h
    incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am
    incubator/qpid/trunk/qpid/python/tests_0-9/dtx.py

Modified: incubator/qpid/trunk/qpid/cpp/src/Makefile.am
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/Makefile.am?view=diff&rev=545531&r1=545530&r2=545531
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/Makefile.am (original)
+++ incubator/qpid/trunk/qpid/cpp/src/Makefile.am Fri Jun  8 08:24:12 2007
@@ -189,6 +189,7 @@
   qpid/broker/DtxBuffer.cpp \
   qpid/broker/DtxHandlerImpl.cpp \
   qpid/broker/DtxManager.cpp \
+  qpid/broker/DtxTimeout.cpp \
   qpid/broker/DtxWorkRecord.cpp \
   qpid/broker/ExchangeRegistry.cpp \
   qpid/broker/FanOutExchange.cpp \
@@ -206,6 +207,7 @@
   qpid/broker/RecoveredEnqueue.cpp \
   qpid/broker/RecoveredDequeue.cpp \
   qpid/broker/Reference.cpp \
+  qpid/broker/Timer.cpp \
   qpid/broker/TopicExchange.cpp \
   qpid/broker/TxAck.cpp \
   qpid/broker/TxBuffer.cpp \
@@ -243,6 +245,7 @@
   qpid/broker/DtxBuffer.h \
   qpid/broker/DtxHandlerImpl.h \
   qpid/broker/DtxManager.h \
+  qpid/broker/DtxTimeout.h \
   qpid/broker/DtxWorkRecord.h \
   qpid/broker/ExchangeRegistry.h \
   qpid/broker/FanOutExchange.h \
@@ -285,6 +288,7 @@
   qpid/broker/PersistableQueue.h \
   qpid/broker/QueuePolicy.h \
   qpid/broker/RecoveryManagerImpl.h \
+  qpid/broker/Timer.h \
   qpid/broker/TopicExchange.h \
   qpid/broker/TransactionalStore.h \
   qpid/broker/TxAck.h \

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerChannel.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerChannel.cpp?view=diff&rev=545531&r1=545530&r2=545531
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerChannel.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerChannel.cpp Fri Jun  8 08:24:12 2007
@@ -38,6 +38,7 @@
 #include "Connection.h"
 #include "DeliverableMessage.h"
 #include "DtxAck.h"
+#include "DtxTimeout.h"
 #include "MessageStore.h"
 #include "TxAck.h"
 #include "TxPublish.h"
@@ -154,18 +155,15 @@
                                   % dtxBuffer->getXid() % xid);
     }
 
+    txBuffer.reset();//ops on this channel no longer transactional
+
+    checkDtxTimeout();
     if (fail) {
-        accumulatedAck.clear();
         dtxBuffer->fail();
     } else {
-        TxOp::shared_ptr txAck(new DtxAck(accumulatedAck, unacked));
-        accumulatedAck.clear();
-        dtxBuffer->enlist(txAck);    
         dtxBuffer->markEnded();
-    }
-    
+    }    
     dtxBuffer.reset();
-    txBuffer.reset();
 }
 
 void Channel::suspendDtx(const std::string& xid){
@@ -173,8 +171,10 @@
         throw ConnectionException(503, boost::format("xid specified on start was %1%, but
%2% specified on suspend") 
                                   % dtxBuffer->getXid() % xid);
     }
+    txBuffer.reset();//ops on this channel no longer transactional
+
+    checkDtxTimeout();
     dtxBuffer->setSuspended(true);
-    txBuffer.reset();
 }
 
 void Channel::resumeDtx(const std::string& xid){
@@ -185,10 +185,20 @@
     if (!dtxBuffer->isSuspended()) {
         throw ConnectionException(503, boost::format("xid %1% not suspended")% xid);
     }
-    dtxBuffer->setSuspended(true);
+
+    checkDtxTimeout();
+    dtxBuffer->setSuspended(false);
     txBuffer = static_pointer_cast<TxBuffer>(dtxBuffer);
 }
 
+void Channel::checkDtxTimeout()
+{
+    if (dtxBuffer->isExpired()) {
+        dtxBuffer.reset();
+        throw DtxTimeoutException();
+    }
+}
+
 void Channel::deliver(
     Message::shared_ptr& msg, const string& consumerTag,
     Queue::shared_ptr& queue, bool ackExpected)
@@ -302,9 +312,14 @@
 void Channel::ack(uint64_t firstTag, uint64_t lastTag){
     if (txBuffer.get()) {
         accumulatedAck.update(firstTag, lastTag);
-
         //TODO: I think the outstanding prefetch size & count should be updated at this
point...
         //TODO: ...this may then necessitate dispatching to consumers
+        if (dtxBuffer.get()) {
+            TxOp::shared_ptr txAck(new DtxAck(accumulatedAck, unacked));
+            accumulatedAck.clear();
+            dtxBuffer->enlist(txAck);    
+        }
+
     } else {
         Mutex::ScopedLock locker(deliveryLock);//need to synchronize with possible concurrent
delivery
     

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerChannel.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerChannel.h?view=diff&rev=545531&r1=545530&r2=545531
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerChannel.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerChannel.h Fri Jun  8 08:24:12 2007
@@ -106,6 +106,8 @@
     void deliver(Message::shared_ptr& msg, const string& tag,
                  Queue::shared_ptr& queue, bool ackExpected);            
     bool checkPrefetch(Message::shared_ptr& msg);
+
+    void checkDtxTimeout();
         
   public:
     Channel(Connection& parent,

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxBuffer.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxBuffer.cpp?view=diff&rev=545531&r1=545530&r2=545531
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxBuffer.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxBuffer.cpp Fri Jun  8 08:24:12 2007
@@ -23,7 +23,8 @@
 using namespace qpid::broker;
 using qpid::sys::Mutex;
 
-DtxBuffer::DtxBuffer(const std::string& _xid) : xid(_xid), ended(false), suspended(false),
failed(false) {}
+DtxBuffer::DtxBuffer(const std::string& _xid) 
+    : xid(_xid), ended(false), suspended(false), failed(false), expired(false) {}
 
 DtxBuffer::~DtxBuffer() {}
 
@@ -68,3 +69,15 @@
     return xid;
 }
 
+void DtxBuffer::timedout()
+{
+    Mutex::ScopedLock locker(lock); 
+    expired = true;
+    fail();
+}
+
+bool DtxBuffer::isExpired()
+{
+    Mutex::ScopedLock locker(lock); 
+    return expired;
+}

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxBuffer.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxBuffer.h?view=diff&rev=545531&r1=545530&r2=545531
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxBuffer.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxBuffer.h Fri Jun  8 08:24:12 2007
@@ -32,6 +32,7 @@
             bool ended;
             bool suspended;           
             bool failed;
+            bool expired;
 
         public:
             typedef boost::shared_ptr<DtxBuffer> shared_ptr;
@@ -44,6 +45,8 @@
             bool isSuspended();
             void fail();
             bool isRollbackOnly();
+            void timedout();
+            bool isExpired();
             const std::string& getXid();
         };
     }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxHandlerImpl.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxHandlerImpl.cpp?view=diff&rev=545531&r1=545530&r2=545531
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxHandlerImpl.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxHandlerImpl.cpp Fri Jun  8 08:24:12 2007
@@ -61,21 +61,24 @@
                          bool fail,
                          bool suspend)
 {
-
-    if (fail) {
-        channel.endDtx(xid, true);
-        if (suspend) {
-            throw ConnectionException(503, "End and suspend cannot both be set.");
-        } else {
-            dClient.endOk(XA_RBROLLBACK, context.getRequestId());
-        }
-    } else {
-        if (suspend) {
-            channel.suspendDtx(xid);
+    try {
+        if (fail) {
+            channel.endDtx(xid, true);
+            if (suspend) {
+                throw ConnectionException(503, "End and suspend cannot both be set.");
+            } else {
+                dClient.endOk(XA_RBROLLBACK, context.getRequestId());
+            }
         } else {
-            channel.endDtx(xid, false);
+            if (suspend) {
+                channel.suspendDtx(xid);
+            } else {
+                channel.endDtx(xid, false);
+            }
+            dClient.endOk(XA_OK, context.getRequestId());
         }
-        dClient.endOk(XA_OK, context.getRequestId());
+    } catch (DtxTimeoutException e) {
+        dClient.endOk(XA_RBTIMEOUT, context.getRequestId());        
     }
 }
 
@@ -88,12 +91,16 @@
     if (join && resume) {
         throw ConnectionException(503, "Join and resume cannot both be set.");
     }
-    if (resume) {
-        channel.resumeDtx(xid);
-    } else {
-        channel.startDtx(xid, broker.getDtxManager(), join);
+    try {
+        if (resume) {
+            channel.resumeDtx(xid);
+        } else {
+            channel.startDtx(xid, broker.getDtxManager(), join);
+        }
+        dClient.startOk(XA_OK, context.getRequestId());
+    } catch (DtxTimeoutException e) {
+        dClient.startOk(XA_RBTIMEOUT, context.getRequestId());        
     }
-    dClient.startOk(XA_OK, context.getRequestId());
 }
 
 // DtxCoordinationHandler:
@@ -102,8 +109,12 @@
                              u_int16_t /*ticket*/,
                              const string& xid)
 {
-    bool ok = broker.getDtxManager().prepare(xid);
-    cClient.prepareOk(ok ? XA_OK : XA_RBROLLBACK, context.getRequestId());
+    try {
+        bool ok = broker.getDtxManager().prepare(xid);
+        cClient.prepareOk(ok ? XA_OK : XA_RBROLLBACK, context.getRequestId());
+    } catch (DtxTimeoutException e) {
+        cClient.prepareOk(XA_RBTIMEOUT, context.getRequestId());        
+    }
 }
 
 void DtxHandlerImpl::commit(const MethodContext& context,
@@ -111,8 +122,12 @@
                             const string& xid,
                             bool onePhase)
 {
-    bool ok = broker.getDtxManager().commit(xid, onePhase);
-    cClient.commitOk(ok ? XA_OK : XA_RBROLLBACK, context.getRequestId());
+    try {
+        bool ok = broker.getDtxManager().commit(xid, onePhase);
+        cClient.commitOk(ok ? XA_OK : XA_RBROLLBACK, context.getRequestId());
+    } catch (DtxTimeoutException e) {
+        cClient.commitOk(XA_RBTIMEOUT, context.getRequestId());        
+    }
 }
 
 
@@ -120,8 +135,12 @@
                               u_int16_t /*ticket*/,
                               const string& xid )
 {
-    broker.getDtxManager().rollback(xid);
-    cClient.rollbackOk(XA_OK, context.getRequestId());
+    try {
+        broker.getDtxManager().rollback(xid);
+        cClient.rollbackOk(XA_OK, context.getRequestId());
+    } catch (DtxTimeoutException e) {
+        cClient.rollbackOk(XA_RBTIMEOUT, context.getRequestId());        
+    }
 }
 
 void DtxHandlerImpl::recover(const MethodContext& context,
@@ -129,8 +148,6 @@
                              bool /*startscan*/,
                              u_int32_t /*endscan*/ )
 {
-    //TODO
-
     //TODO: what do startscan and endscan actually mean?
 
     // response should hold on key value pair with key = 'xids' and
@@ -171,19 +188,21 @@
     throw ConnectionException(503, boost::format("Forget is invalid. Branch with xid %1%
not heuristically completed!") % xid);
 }
 
-void DtxHandlerImpl::getTimeout(const MethodContext& /*context*/,
-                                const string& /*xid*/ )
+void DtxHandlerImpl::getTimeout(const MethodContext& context,
+                                const string& xid)
 {
-    //TODO
+    uint32_t timeout = broker.getDtxManager().getTimeout(xid);
+    cClient.getTimeoutOk(timeout, context.getRequestId());    
 }
 
 
-void DtxHandlerImpl::setTimeout(const MethodContext& /*context*/,
+void DtxHandlerImpl::setTimeout(const MethodContext& context,
                                 u_int16_t /*ticket*/,
-                                const string& /*xid*/,
-                                u_int32_t /*timeout*/ )
+                                const string& xid,
+                                u_int32_t timeout)
 {
-    //TODO
+    broker.getDtxManager().setTimeout(xid, timeout);
+    cClient.setTimeoutOk(context.getRequestId());    
 }
 
 

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxManager.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxManager.cpp?view=diff&rev=545531&r1=545530&r2=545531
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxManager.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxManager.cpp Fri Jun  8 08:24:12 2007
@@ -19,6 +19,8 @@
  *
  */
 #include "DtxManager.h"
+#include "DtxTimeout.h"
+#include "qpid/log/Statement.h"
 #include <boost/format.hpp>
 #include <iostream>
 using qpid::sys::Mutex;
@@ -29,37 +31,52 @@
 
 DtxManager::~DtxManager() {}
 
-void DtxManager::start(std::string xid, DtxBuffer::shared_ptr ops)
+void DtxManager::start(const std::string& xid, DtxBuffer::shared_ptr ops)
 {
     createWork(xid)->add(ops);
 }
 
-void DtxManager::join(std::string xid, DtxBuffer::shared_ptr ops)
+void DtxManager::join(const std::string& xid, DtxBuffer::shared_ptr ops)
 {
     getWork(xid)->add(ops);
 }
 
-void DtxManager::recover(std::string xid, std::auto_ptr<TPCTransactionContext> txn,
DtxBuffer::shared_ptr ops)
+void DtxManager::recover(const std::string& xid, std::auto_ptr<TPCTransactionContext>
txn, DtxBuffer::shared_ptr ops)
 {
     createWork(xid)->recover(txn, ops);
 }
 
 bool DtxManager::prepare(const std::string& xid) 
 { 
-    return getWork(xid)->prepare();
+    try {
+        return getWork(xid)->prepare();
+    } catch (DtxTimeoutException& e) {
+        remove(xid);
+        throw e;
+    }
 }
 
 bool DtxManager::commit(const std::string& xid, bool onePhase) 
 { 
-    bool result = getWork(xid)->commit(onePhase);
-    remove(xid);
-    return result;
+    try {
+        bool result = getWork(xid)->commit(onePhase);
+        remove(xid);
+        return result;
+    } catch (DtxTimeoutException& e) {
+        remove(xid);
+        throw e;
+    }
 }
 
 void DtxManager::rollback(const std::string& xid) 
 { 
-    getWork(xid)->rollback();
-    remove(xid);
+    try {
+        getWork(xid)->rollback();
+        remove(xid);
+    } catch (DtxTimeoutException& e) {
+        remove(xid);
+        throw e;
+    }
 }
 
 DtxManager::WorkMap::iterator DtxManager::getWork(const std::string& xid)
@@ -83,7 +100,7 @@
     }
 }
 
-DtxManager::WorkMap::iterator DtxManager::createWork(std::string& xid)
+DtxManager::WorkMap::iterator DtxManager::createWork(std::string xid)
 {
     Mutex::ScopedLock locker(lock); 
     WorkMap::iterator i = work.find(xid);
@@ -91,5 +108,50 @@
         throw ConnectionException(503, boost::format("Xid %1% is already known (use 'join'
to add work to an existing xid)!") % xid);
     } else {
         return work.insert(xid, new DtxWorkRecord(xid, store)).first;
+    }
+}
+
+void DtxManager::setTimeout(const std::string& xid, uint32_t secs)
+{
+    WorkMap::iterator record = getWork(xid);
+    DtxTimeout::shared_ptr timeout = record->getTimeout();
+    if (timeout.get()) {
+        if (timeout->timeout == secs) return;//no need to do anything further if timeout
hasn't changed
+        timeout->cancelled = true;
+    }
+    timeout = DtxTimeout::shared_ptr(new DtxTimeout(secs, *this, xid));
+    record->setTimeout(timeout);
+    timer.add(boost::static_pointer_cast<TimerTask>(timeout));
+    
+}
+
+uint32_t DtxManager::getTimeout(const std::string& xid)
+{
+    DtxTimeout::shared_ptr timeout = getWork(xid)->getTimeout();
+    return !timeout ? 0 : timeout->timeout;
+}
+
+void DtxManager::timedout(const std::string& xid)
+{
+    Mutex::ScopedLock locker(lock); 
+    WorkMap::iterator i = work.find(xid);
+    if (i == work.end()) {
+        QPID_LOG(warning, "Transaction timeout failed: no record for xid");
+    } else {
+        i->timedout();
+        //TODO: do we want to have a timed task to cleanup, or can we rely on an explicit
completion?
+        //timer.add(TimerTask::shared_ptr(new DtxCleanup(60*30/*30 mins*/, *this, xid)));
+    }
+}
+
+DtxManager::DtxCleanup::DtxCleanup(uint32_t _timeout, DtxManager& _mgr, const std::string&
_xid) 
+    : TimerTask(qpid::sys::Duration(_timeout * qpid::sys::TIME_SEC)), mgr(_mgr), xid(_xid)
{}
+
+void DtxManager::DtxCleanup::fire()
+{
+    try {
+        mgr.remove(xid);
+    } catch (ConnectionException& e) {
+        //assume it was explicitly cleaned up after a call to prepare, commit or rollback
     }
 }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxManager.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxManager.h?view=diff&rev=545531&r1=545530&r2=545531
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxManager.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxManager.h Fri Jun  8 08:24:12 2007
@@ -24,6 +24,7 @@
 #include <boost/ptr_container/ptr_map.hpp>
 #include "DtxBuffer.h"
 #include "DtxWorkRecord.h"
+#include "Timer.h"
 #include "TransactionalStore.h"
 #include "qpid/framing/amqp_types.h"
 #include "qpid/sys/Mutex.h"
@@ -34,23 +35,37 @@
 class DtxManager{
     typedef boost::ptr_map<std::string, DtxWorkRecord> WorkMap;
 
+
+    struct DtxCleanup : public TimerTask
+    {
+        DtxManager& mgr;
+        const std::string& xid;
+        
+        DtxCleanup(uint32_t timeout, DtxManager& mgr, const std::string& xid);  
 
+        void fire();
+    };
+
     WorkMap work;
     TransactionalStore* const store;
     qpid::sys::Mutex lock;
+    Timer timer;
 
     void remove(const std::string& xid);
     WorkMap::iterator getWork(const std::string& xid);
-    WorkMap::iterator createWork(std::string& xid);
+    WorkMap::iterator createWork(std::string xid);
 
 public:
     DtxManager(TransactionalStore* const store);
     ~DtxManager();
-    void start(std::string xid, DtxBuffer::shared_ptr work);
-    void join(std::string xid, DtxBuffer::shared_ptr work);
-    void recover(std::string xid, std::auto_ptr<TPCTransactionContext> txn, DtxBuffer::shared_ptr
work);
+    void start(const std::string& xid, DtxBuffer::shared_ptr work);
+    void join(const std::string& xid, DtxBuffer::shared_ptr work);
+    void recover(const std::string& xid, std::auto_ptr<TPCTransactionContext> txn,
DtxBuffer::shared_ptr work);
     bool prepare(const std::string& xid);
     bool commit(const std::string& xid, bool onePhase);
     void rollback(const std::string& xid);
+    void setTimeout(const std::string& xid, uint32_t secs);
+    uint32_t getTimeout(const std::string& xid);
+    void timedout(const std::string& xid);
 };
 
 }

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxTimeout.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxTimeout.cpp?view=auto&rev=545531
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxTimeout.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxTimeout.cpp Fri Jun  8 08:24:12 2007
@@ -0,0 +1,35 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "DtxTimeout.h"
+#include "DtxManager.h"
+#include "qpid/sys/Time.h"
+
+using namespace qpid::broker;
+
+DtxTimeout::DtxTimeout(uint32_t _timeout, DtxManager& _mgr, const std::string& _xid)

+    : TimerTask(qpid::sys::Duration(_timeout * qpid::sys::TIME_SEC)), timeout(_timeout),
mgr(_mgr), xid(_xid) 
+{
+}
+
+void DtxTimeout::fire()
+{
+    mgr.timedout(xid);
+}

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxTimeout.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxTimeout.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxTimeout.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxTimeout.h?view=auto&rev=545531
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxTimeout.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxTimeout.h Fri Jun  8 08:24:12 2007
@@ -0,0 +1,54 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _DtxTimeout_
+#define _DtxTimeout_
+
+#include "qpid/Exception.h"
+#include "Timer.h"
+
+namespace qpid {
+namespace broker {
+
+class DtxManager;
+
+
+struct DtxTimeoutException : public Exception 
+{
+    DtxTimeoutException() {}
+};
+
+
+struct DtxTimeout : public TimerTask
+{
+    typedef boost::shared_ptr<DtxTimeout> shared_ptr;
+    const uint32_t timeout;
+    DtxManager& mgr;
+    const std::string xid;
+    
+    DtxTimeout(uint32_t timeout, DtxManager& mgr, const std::string& xid);    
+    void fire();
+};
+
+}
+}
+
+
+#endif

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxTimeout.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxTimeout.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxWorkRecord.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxWorkRecord.cpp?view=diff&rev=545531&r1=545530&r2=545531
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxWorkRecord.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxWorkRecord.cpp Fri Jun  8 08:24:12 2007
@@ -27,9 +27,14 @@
 using namespace qpid::broker;
 
 DtxWorkRecord::DtxWorkRecord(const std::string& _xid, TransactionalStore* const _store)
: 
-    xid(_xid), store(_store), completed(false), rolledback(false), prepared(false) {}
+    xid(_xid), store(_store), completed(false), rolledback(false), prepared(false), expired(false)
{}
 
-DtxWorkRecord::~DtxWorkRecord() {}
+DtxWorkRecord::~DtxWorkRecord() 
+{
+    if (timeout.get()) {  
+        timeout->cancelled = true;
+    }
+}
 
 bool DtxWorkRecord::prepare()
 {
@@ -110,6 +115,9 @@
 void DtxWorkRecord::add(DtxBuffer::shared_ptr ops)
 {
     Mutex::ScopedLock locker(lock); 
+    if (expired) {
+        throw DtxTimeoutException();
+    }
     if (completed) {
         throw ConnectionException(503, boost::format("Branch with xid %1% has been completed!")
% xid);
     }
@@ -118,6 +126,9 @@
 
 bool DtxWorkRecord::check()
 {
+    if (expired) {
+        throw DtxTimeoutException();
+    }
     if (!completed) {
         //iterate through all DtxBuffers and ensure they are all ended
         for (Work::iterator i = work.begin(); i != work.end(); i++) {
@@ -148,4 +159,19 @@
     ops->markEnded();
     completed = true;
     prepared = true;
+}
+
+void DtxWorkRecord::timedout()
+{
+    Mutex::ScopedLock locker(lock); 
+    expired = true;
+    rolledback = true;
+    if (!completed) {
+        for (Work::iterator i = work.begin(); i != work.end(); i++) {
+            if (!(*i)->isEnded()) {
+                (*i)->timedout();
+            }
+        }
+    }
+    abort();
 }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxWorkRecord.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxWorkRecord.h?view=diff&rev=545531&r1=545530&r2=545531
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxWorkRecord.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxWorkRecord.h Fri Jun  8 08:24:12 2007
@@ -25,6 +25,7 @@
 #include <functional>
 #include <vector>
 #include "DtxBuffer.h"
+#include "DtxTimeout.h"
 #include "TransactionalStore.h"
 #include "qpid/framing/amqp_types.h"
 #include "qpid/sys/Mutex.h"
@@ -46,6 +47,8 @@
     bool completed;
     bool rolledback;
     bool prepared;
+    bool expired;
+    DtxTimeout::shared_ptr timeout;
     Work work;
     std::auto_ptr<TPCTransactionContext> txn;
     qpid::sys::Mutex lock;
@@ -61,6 +64,9 @@
     void rollback();
     void add(DtxBuffer::shared_ptr ops);
     void recover(std::auto_ptr<TPCTransactionContext> txn, DtxBuffer::shared_ptr ops);
+    void timedout();
+    void setTimeout(DtxTimeout::shared_ptr t) { timeout = t; }
+    DtxTimeout::shared_ptr getTimeout() { return timeout; }
 };
 
 }

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Timer.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Timer.cpp?view=auto&rev=545531
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Timer.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Timer.cpp Fri Jun  8 08:24:12 2007
@@ -0,0 +1,100 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "Timer.h"
+#include <iostream>
+
+using qpid::sys::AbsTime;
+using qpid::sys::Duration;
+using qpid::sys::Monitor;
+using qpid::sys::Thread;
+using namespace qpid::broker;
+
+TimerTask::TimerTask(Duration timeout) : time(AbsTime::now(), timeout), cancelled(false)
{}
+TimerTask::TimerTask(AbsTime _time) : time(_time), cancelled(false) {}
+TimerTask::~TimerTask(){}
+
+Timer::Timer() : active(false) 
+{
+    start();
+}
+
+Timer::~Timer() 
+{
+    stop();
+}
+
+void Timer::run()
+{
+    Monitor::ScopedLock l(monitor);
+    while(active){
+        if (tasks.empty()) {
+            monitor.wait();
+        } else {
+            TimerTask::shared_ptr t = tasks.top();
+            if (t->cancelled) {
+                tasks.pop();
+            } else if(t->time < AbsTime::now()) {
+                tasks.pop();
+                t->fire();
+            } else {
+                monitor.wait(t->time);
+            }
+        }
+    }
+}
+
+void Timer::add(TimerTask::shared_ptr task)
+{
+    Monitor::ScopedLock l(monitor);
+    tasks.push(task);
+    monitor.notify();
+}
+
+void Timer::start()
+{
+    Monitor::ScopedLock l(monitor);
+    if (!active) {
+        active = true;
+        runner = std::auto_ptr<Thread>(new Thread(this));
+    }
+}
+
+void Timer::stop()
+{
+    signalStop();
+    if (runner.get()) {
+        runner->join();
+        runner.reset();
+    }
+}
+void Timer::signalStop()
+{
+    Monitor::ScopedLock l(monitor);
+    if (active) {
+        active = false;
+        monitor.notifyAll();
+    }
+}
+
+bool Later::operator()(const TimerTask::shared_ptr& a, const TimerTask::shared_ptr&
b) const
+{
+    return a.get() && b.get() && a->time > b->time;
+}

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Timer.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Timer.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Timer.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Timer.h?view=auto&rev=545531
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Timer.h (added)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Timer.h Fri Jun  8 08:24:12 2007
@@ -0,0 +1,76 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#ifndef _Timer_
+#define _Timer_
+
+#include <memory>
+#include <queue>
+#include <boost/shared_ptr.hpp>
+#include "qpid/sys/Monitor.h"
+#include "qpid/sys/Thread.h"
+#include "qpid/sys/Runnable.h"
+
+namespace qpid {
+namespace broker {
+
+struct TimerTask
+{
+    typedef boost::shared_ptr<TimerTask> shared_ptr;
+
+    const qpid::sys::AbsTime time;
+    volatile bool cancelled;
+
+    TimerTask(qpid::sys::Duration timeout);
+    TimerTask(qpid::sys::AbsTime time);
+    virtual ~TimerTask();
+    virtual void fire() = 0;
+};
+
+ struct Later
+ {
+     bool operator()(const TimerTask::shared_ptr& a, const TimerTask::shared_ptr&
b) const;
+ };
+
+class Timer : private qpid::sys::Runnable
+{
+    qpid::sys::Monitor monitor;            
+    std::priority_queue<TimerTask::shared_ptr, std::vector<TimerTask::shared_ptr>,
Later> tasks;
+    std::auto_ptr<qpid::sys::Thread> runner;
+    bool active;
+
+    void run();
+    void signalStop();
+
+public:
+    Timer();
+    ~Timer();
+
+    void add(TimerTask::shared_ptr task);
+    void start();
+    void stop();
+
+};
+
+}
+}
+
+
+#endif

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Timer.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Timer.h
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Time.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Time.h?view=diff&rev=545531&r1=545530&r2=545531
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Time.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Time.h Fri Jun  8 08:24:12 2007
@@ -44,6 +44,9 @@
 	 
 	static AbsTime now();
 	inline static AbsTime FarFuture();
+
+        friend bool operator<(const AbsTime& a, const AbsTime& b);
+        friend bool operator>(const AbsTime& a, const AbsTime& b);
 };
 
 class Duration {
@@ -66,6 +69,9 @@
 AbsTime AbsTime::FarFuture() { AbsTime ff; ff.time_ns = std::numeric_limits<int64_t>::max();
return ff;}
 
 inline AbsTime now() { return AbsTime::now(); }
+
+inline bool operator<(const AbsTime& a, const AbsTime& b) { return a.time_ns <
b.time_ns; }
+inline bool operator>(const AbsTime& a, const AbsTime& b) { return a.time_ns >
b.time_ns; }
 
 Duration::Duration(int64_t time0) :
 	nanosecs(time0)

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am?view=diff&rev=545531&r1=545530&r2=545531
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am Fri Jun  8 08:24:12 2007
@@ -32,6 +32,7 @@
   QueueRegistryTest	\
   QueueTest		\
   QueuePolicyTest	\
+  TimerTest		\
   TopicExchangeTest	\
   TxAckTest		\
   TxBufferTest		\

Added: incubator/qpid/trunk/qpid/cpp/src/tests/TimerTest.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/TimerTest.cpp?view=auto&rev=545531
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/TimerTest.cpp (added)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/TimerTest.cpp Fri Jun  8 08:24:12 2007
@@ -0,0 +1,128 @@
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "qpid/broker/Timer.h"
+#include "qpid/sys/Monitor.h"
+#include "qpid_test_plugin.h"
+#include <math.h>
+#include <iostream>
+#include <memory>
+#include <boost/format.hpp>
+
+using namespace qpid::broker;
+using namespace qpid::sys;
+using boost::dynamic_pointer_cast;
+
+class TimerTest : public CppUnit::TestCase  
+{
+    CPPUNIT_TEST_SUITE(TimerTest);
+    CPPUNIT_TEST(testGeneral);
+    CPPUNIT_TEST_SUITE_END();
+
+    class Counter
+    {
+        Mutex lock;
+        uint counter;
+    public:
+        Counter() : counter(0) {}
+        uint next()
+        {
+            Mutex::ScopedLock l(lock);
+            return ++counter;
+        }
+    };
+    
+    class TestTask : public TimerTask
+    {
+        const AbsTime start;
+        const Duration expected;
+        AbsTime end;
+        bool fired;
+        uint position;
+        Monitor monitor;
+        Counter& counter;
+
+    public:
+        TestTask(Duration timeout, Counter& _counter) 
+            : TimerTask(timeout), start(now()), expected(timeout), end(start), fired(false),
counter(_counter) {}
+
+        void fire()
+        {
+            Monitor::ScopedLock l(monitor);
+            fired = true;
+            position = counter.next();
+            end = now();
+            monitor.notify();
+        }
+
+        void check(uint expected_position, uint64_t tolerance = 500 * TIME_MSEC)
+        {
+            Monitor::ScopedLock l(monitor);
+            CPPUNIT_ASSERT(fired);
+            CPPUNIT_ASSERT_EQUAL(expected_position, position);
+            Duration actual(start, end);
+            uint64_t difference = abs(expected - actual);
+            std::string msg(boost::lexical_cast<std::string>(boost::format("tolerance
= %1%, difference = %2%") % tolerance % difference));
+            CPPUNIT_ASSERT_MESSAGE(msg, difference < tolerance);
+        }
+
+        void wait(Duration d)
+        {
+            Monitor::ScopedLock l(monitor);
+            monitor.wait(AbsTime(now(), d));
+        }
+    };
+
+    class DummyRunner : public Runnable
+    {
+    public:
+        void run() {}
+    };
+
+public:
+
+    void testGeneral()
+    {
+        Counter counter;
+        Timer timer;
+        TestTask::shared_ptr task1(new TestTask(Duration(3 * TIME_SEC), counter));
+        TestTask::shared_ptr task2(new TestTask(Duration(1 * TIME_SEC), counter));
+        TestTask::shared_ptr task3(new TestTask(Duration(4 * TIME_SEC), counter));
+        TestTask::shared_ptr task4(new TestTask(Duration(2 * TIME_SEC), counter));
+        
+        timer.add(task1);
+        timer.add(task2);
+        timer.add(task3);
+        timer.add(task4);
+        
+        dynamic_pointer_cast<TestTask>(task3)->wait(Duration(6 * TIME_SEC));
+        
+        dynamic_pointer_cast<TestTask>(task1)->check(3);
+        dynamic_pointer_cast<TestTask>(task2)->check(1);
+        dynamic_pointer_cast<TestTask>(task3)->check(4);
+        dynamic_pointer_cast<TestTask>(task4)->check(2);
+    }
+};
+
+// Make this test suite a plugin.
+CPPUNIT_PLUGIN_IMPLEMENT();
+CPPUNIT_TEST_SUITE_REGISTRATION(TimerTest);
+

Propchange: incubator/qpid/trunk/qpid/cpp/src/tests/TimerTest.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/cpp/src/tests/TimerTest.cpp
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: incubator/qpid/trunk/qpid/python/tests_0-9/dtx.py
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/tests_0-9/dtx.py?view=diff&rev=545531&r1=545530&r2=545531
==============================================================================
--- incubator/qpid/trunk/qpid/python/tests_0-9/dtx.py (original)
+++ incubator/qpid/trunk/qpid/python/tests_0-9/dtx.py Fri Jun  8 08:24:12 2007
@@ -21,6 +21,7 @@
 from qpid.content import Content
 from qpid.testlib import testrunner, TestBase
 from struct import pack, unpack
+from time import sleep
 
 class DtxTests(TestBase):
     """
@@ -37,6 +38,7 @@
     """
 
     XA_RBROLLBACK = 1
+    XA_RBTIMEOUT = 2
     XA_OK = 8
 
     def test_simple_commit(self):
@@ -450,6 +452,51 @@
 
         self.assertEqual(self.XA_RBROLLBACK, channel1.dtx_coordination_prepare(xid=tx).flags)
         channel1.dtx_coordination_rollback(xid=tx)
+
+    def test_get_timeout(self):
+        """        
+        Check that get-timeout returns the correct value, (and that a
+        transaction with a timeout can complete normally)        
+        """
+        channel = self.channel
+        tx = self.xid("dummy")
+
+        channel.dtx_demarcation_select()
+        channel.dtx_demarcation_start(xid=tx)
+        self.assertEqual(0, channel.dtx_coordination_get_timeout(xid=tx).timeout)
+        channel.dtx_coordination_set_timeout(xid=tx, timeout=60)
+        self.assertEqual(60, channel.dtx_coordination_get_timeout(xid=tx).timeout)
+        self.assertEqual(self.XA_OK, channel.dtx_demarcation_end(xid=tx).flags)
+        self.assertEqual(self.XA_OK, channel.dtx_coordination_rollback(xid=tx).flags)   
    
+        
+    def test_set_timeout(self):
+        """        
+        Test the timeout of a transaction results in the expected
+        behaviour        
+        """
+        #open new channel to allow self.channel to be used in checking te queue
+        channel = self.client.channel(2)
+        channel.channel_open()
+        #setup:
+        tx = self.xid("dummy")
+        channel.queue_declare(queue="queue-a", exclusive=True)
+        channel.queue_declare(queue="queue-b", exclusive=True)
+        channel.message_transfer(routing_key="queue-a", message_id="timeout", body="DtxMessage")
+
+        channel.dtx_demarcation_select()
+        channel.dtx_demarcation_start(xid=tx)
+        self.swap(channel, "queue-a", "queue-b")
+        channel.dtx_coordination_set_timeout(xid=tx, timeout=2)
+        sleep(3)
+        #check that the work has been rolled back already
+        self.assertMessageCount(1, "queue-a")
+        self.assertMessageCount(0, "queue-b")
+        self.assertMessageId("timeout", "queue-a")
+        #check the correct codes are returned when we try to complete the txn
+        self.assertEqual(self.XA_RBTIMEOUT, channel.dtx_demarcation_end(xid=tx).flags)
+        self.assertEqual(self.XA_RBTIMEOUT, channel.dtx_coordination_rollback(xid=tx).flags)
       
+
+
 
     def test_recover(self):
         """



Mime
View raw message