Return-Path: Delivered-To: apmail-incubator-qpid-commits-archive@locus.apache.org Received: (qmail 10542 invoked from network); 8 Jun 2007 15:24:37 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 8 Jun 2007 15:24:37 -0000 Received: (qmail 17502 invoked by uid 500); 8 Jun 2007 15:24:40 -0000 Delivered-To: apmail-incubator-qpid-commits-archive@incubator.apache.org Received: (qmail 17480 invoked by uid 500); 8 Jun 2007 15:24:40 -0000 Mailing-List: contact qpid-commits-help@incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: qpid-dev@incubator.apache.org Delivered-To: mailing list qpid-commits@incubator.apache.org Received: (qmail 17467 invoked by uid 99); 8 Jun 2007 15:24:40 -0000 Received: from herse.apache.org (HELO herse.apache.org) (140.211.11.133) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 08 Jun 2007 08:24:40 -0700 X-ASF-Spam-Status: No, hits=-99.5 required=10.0 tests=ALL_TRUSTED,NO_REAL_NAME X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO eris.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 08 Jun 2007 08:24:34 -0700 Received: by eris.apache.org (Postfix, from userid 65534) id 6A03E1A981A; Fri, 8 Jun 2007 08:24:14 -0700 (PDT) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r545531 - in /incubator/qpid/trunk/qpid: cpp/src/ cpp/src/qpid/broker/ cpp/src/qpid/sys/ cpp/src/tests/ python/tests_0-9/ Date: Fri, 08 Jun 2007 15:24:13 -0000 To: qpid-commits@incubator.apache.org From: gsim@apache.org X-Mailer: svnmailer-1.1.0 Message-Id: <20070608152414.6A03E1A981A@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: gsim Date: Fri Jun 8 08:24:12 2007 New Revision: 545531 URL: http://svn.apache.org/viewvc?view=rev&rev=545531 Log: Timeout handling for dtx, plus tests. Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxTimeout.cpp (with props) incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxTimeout.h (with props) incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Timer.cpp (with props) incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Timer.h (with props) incubator/qpid/trunk/qpid/cpp/src/tests/TimerTest.cpp (with props) Modified: incubator/qpid/trunk/qpid/cpp/src/Makefile.am incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerChannel.cpp incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerChannel.h incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxBuffer.cpp incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxBuffer.h incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxHandlerImpl.cpp incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxManager.cpp incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxManager.h incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxWorkRecord.cpp incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxWorkRecord.h incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Time.h incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am incubator/qpid/trunk/qpid/python/tests_0-9/dtx.py Modified: incubator/qpid/trunk/qpid/cpp/src/Makefile.am URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/Makefile.am?view=diff&rev=545531&r1=545530&r2=545531 ============================================================================== --- incubator/qpid/trunk/qpid/cpp/src/Makefile.am (original) +++ incubator/qpid/trunk/qpid/cpp/src/Makefile.am Fri Jun 8 08:24:12 2007 @@ -189,6 +189,7 @@ qpid/broker/DtxBuffer.cpp \ qpid/broker/DtxHandlerImpl.cpp \ qpid/broker/DtxManager.cpp \ + qpid/broker/DtxTimeout.cpp \ qpid/broker/DtxWorkRecord.cpp \ qpid/broker/ExchangeRegistry.cpp \ qpid/broker/FanOutExchange.cpp \ @@ -206,6 +207,7 @@ qpid/broker/RecoveredEnqueue.cpp \ qpid/broker/RecoveredDequeue.cpp \ qpid/broker/Reference.cpp \ + qpid/broker/Timer.cpp \ qpid/broker/TopicExchange.cpp \ qpid/broker/TxAck.cpp \ qpid/broker/TxBuffer.cpp \ @@ -243,6 +245,7 @@ qpid/broker/DtxBuffer.h \ qpid/broker/DtxHandlerImpl.h \ qpid/broker/DtxManager.h \ + qpid/broker/DtxTimeout.h \ qpid/broker/DtxWorkRecord.h \ qpid/broker/ExchangeRegistry.h \ qpid/broker/FanOutExchange.h \ @@ -285,6 +288,7 @@ qpid/broker/PersistableQueue.h \ qpid/broker/QueuePolicy.h \ qpid/broker/RecoveryManagerImpl.h \ + qpid/broker/Timer.h \ qpid/broker/TopicExchange.h \ qpid/broker/TransactionalStore.h \ qpid/broker/TxAck.h \ Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerChannel.cpp URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerChannel.cpp?view=diff&rev=545531&r1=545530&r2=545531 ============================================================================== --- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerChannel.cpp (original) +++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerChannel.cpp Fri Jun 8 08:24:12 2007 @@ -38,6 +38,7 @@ #include "Connection.h" #include "DeliverableMessage.h" #include "DtxAck.h" +#include "DtxTimeout.h" #include "MessageStore.h" #include "TxAck.h" #include "TxPublish.h" @@ -154,18 +155,15 @@ % dtxBuffer->getXid() % xid); } + txBuffer.reset();//ops on this channel no longer transactional + + checkDtxTimeout(); if (fail) { - accumulatedAck.clear(); dtxBuffer->fail(); } else { - TxOp::shared_ptr txAck(new DtxAck(accumulatedAck, unacked)); - accumulatedAck.clear(); - dtxBuffer->enlist(txAck); dtxBuffer->markEnded(); - } - + } dtxBuffer.reset(); - txBuffer.reset(); } void Channel::suspendDtx(const std::string& xid){ @@ -173,8 +171,10 @@ throw ConnectionException(503, boost::format("xid specified on start was %1%, but %2% specified on suspend") % dtxBuffer->getXid() % xid); } + txBuffer.reset();//ops on this channel no longer transactional + + checkDtxTimeout(); dtxBuffer->setSuspended(true); - txBuffer.reset(); } void Channel::resumeDtx(const std::string& xid){ @@ -185,10 +185,20 @@ if (!dtxBuffer->isSuspended()) { throw ConnectionException(503, boost::format("xid %1% not suspended")% xid); } - dtxBuffer->setSuspended(true); + + checkDtxTimeout(); + dtxBuffer->setSuspended(false); txBuffer = static_pointer_cast(dtxBuffer); } +void Channel::checkDtxTimeout() +{ + if (dtxBuffer->isExpired()) { + dtxBuffer.reset(); + throw DtxTimeoutException(); + } +} + void Channel::deliver( Message::shared_ptr& msg, const string& consumerTag, Queue::shared_ptr& queue, bool ackExpected) @@ -302,9 +312,14 @@ void Channel::ack(uint64_t firstTag, uint64_t lastTag){ if (txBuffer.get()) { accumulatedAck.update(firstTag, lastTag); - //TODO: I think the outstanding prefetch size & count should be updated at this point... //TODO: ...this may then necessitate dispatching to consumers + if (dtxBuffer.get()) { + TxOp::shared_ptr txAck(new DtxAck(accumulatedAck, unacked)); + accumulatedAck.clear(); + dtxBuffer->enlist(txAck); + } + } else { Mutex::ScopedLock locker(deliveryLock);//need to synchronize with possible concurrent delivery Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerChannel.h URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerChannel.h?view=diff&rev=545531&r1=545530&r2=545531 ============================================================================== --- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerChannel.h (original) +++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/BrokerChannel.h Fri Jun 8 08:24:12 2007 @@ -106,6 +106,8 @@ void deliver(Message::shared_ptr& msg, const string& tag, Queue::shared_ptr& queue, bool ackExpected); bool checkPrefetch(Message::shared_ptr& msg); + + void checkDtxTimeout(); public: Channel(Connection& parent, Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxBuffer.cpp URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxBuffer.cpp?view=diff&rev=545531&r1=545530&r2=545531 ============================================================================== --- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxBuffer.cpp (original) +++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxBuffer.cpp Fri Jun 8 08:24:12 2007 @@ -23,7 +23,8 @@ using namespace qpid::broker; using qpid::sys::Mutex; -DtxBuffer::DtxBuffer(const std::string& _xid) : xid(_xid), ended(false), suspended(false), failed(false) {} +DtxBuffer::DtxBuffer(const std::string& _xid) + : xid(_xid), ended(false), suspended(false), failed(false), expired(false) {} DtxBuffer::~DtxBuffer() {} @@ -68,3 +69,15 @@ return xid; } +void DtxBuffer::timedout() +{ + Mutex::ScopedLock locker(lock); + expired = true; + fail(); +} + +bool DtxBuffer::isExpired() +{ + Mutex::ScopedLock locker(lock); + return expired; +} Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxBuffer.h URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxBuffer.h?view=diff&rev=545531&r1=545530&r2=545531 ============================================================================== --- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxBuffer.h (original) +++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxBuffer.h Fri Jun 8 08:24:12 2007 @@ -32,6 +32,7 @@ bool ended; bool suspended; bool failed; + bool expired; public: typedef boost::shared_ptr shared_ptr; @@ -44,6 +45,8 @@ bool isSuspended(); void fail(); bool isRollbackOnly(); + void timedout(); + bool isExpired(); const std::string& getXid(); }; } Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxHandlerImpl.cpp URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxHandlerImpl.cpp?view=diff&rev=545531&r1=545530&r2=545531 ============================================================================== --- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxHandlerImpl.cpp (original) +++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxHandlerImpl.cpp Fri Jun 8 08:24:12 2007 @@ -61,21 +61,24 @@ bool fail, bool suspend) { - - if (fail) { - channel.endDtx(xid, true); - if (suspend) { - throw ConnectionException(503, "End and suspend cannot both be set."); - } else { - dClient.endOk(XA_RBROLLBACK, context.getRequestId()); - } - } else { - if (suspend) { - channel.suspendDtx(xid); + try { + if (fail) { + channel.endDtx(xid, true); + if (suspend) { + throw ConnectionException(503, "End and suspend cannot both be set."); + } else { + dClient.endOk(XA_RBROLLBACK, context.getRequestId()); + } } else { - channel.endDtx(xid, false); + if (suspend) { + channel.suspendDtx(xid); + } else { + channel.endDtx(xid, false); + } + dClient.endOk(XA_OK, context.getRequestId()); } - dClient.endOk(XA_OK, context.getRequestId()); + } catch (DtxTimeoutException e) { + dClient.endOk(XA_RBTIMEOUT, context.getRequestId()); } } @@ -88,12 +91,16 @@ if (join && resume) { throw ConnectionException(503, "Join and resume cannot both be set."); } - if (resume) { - channel.resumeDtx(xid); - } else { - channel.startDtx(xid, broker.getDtxManager(), join); + try { + if (resume) { + channel.resumeDtx(xid); + } else { + channel.startDtx(xid, broker.getDtxManager(), join); + } + dClient.startOk(XA_OK, context.getRequestId()); + } catch (DtxTimeoutException e) { + dClient.startOk(XA_RBTIMEOUT, context.getRequestId()); } - dClient.startOk(XA_OK, context.getRequestId()); } // DtxCoordinationHandler: @@ -102,8 +109,12 @@ u_int16_t /*ticket*/, const string& xid) { - bool ok = broker.getDtxManager().prepare(xid); - cClient.prepareOk(ok ? XA_OK : XA_RBROLLBACK, context.getRequestId()); + try { + bool ok = broker.getDtxManager().prepare(xid); + cClient.prepareOk(ok ? XA_OK : XA_RBROLLBACK, context.getRequestId()); + } catch (DtxTimeoutException e) { + cClient.prepareOk(XA_RBTIMEOUT, context.getRequestId()); + } } void DtxHandlerImpl::commit(const MethodContext& context, @@ -111,8 +122,12 @@ const string& xid, bool onePhase) { - bool ok = broker.getDtxManager().commit(xid, onePhase); - cClient.commitOk(ok ? XA_OK : XA_RBROLLBACK, context.getRequestId()); + try { + bool ok = broker.getDtxManager().commit(xid, onePhase); + cClient.commitOk(ok ? XA_OK : XA_RBROLLBACK, context.getRequestId()); + } catch (DtxTimeoutException e) { + cClient.commitOk(XA_RBTIMEOUT, context.getRequestId()); + } } @@ -120,8 +135,12 @@ u_int16_t /*ticket*/, const string& xid ) { - broker.getDtxManager().rollback(xid); - cClient.rollbackOk(XA_OK, context.getRequestId()); + try { + broker.getDtxManager().rollback(xid); + cClient.rollbackOk(XA_OK, context.getRequestId()); + } catch (DtxTimeoutException e) { + cClient.rollbackOk(XA_RBTIMEOUT, context.getRequestId()); + } } void DtxHandlerImpl::recover(const MethodContext& context, @@ -129,8 +148,6 @@ bool /*startscan*/, u_int32_t /*endscan*/ ) { - //TODO - //TODO: what do startscan and endscan actually mean? // response should hold on key value pair with key = 'xids' and @@ -171,19 +188,21 @@ throw ConnectionException(503, boost::format("Forget is invalid. Branch with xid %1% not heuristically completed!") % xid); } -void DtxHandlerImpl::getTimeout(const MethodContext& /*context*/, - const string& /*xid*/ ) +void DtxHandlerImpl::getTimeout(const MethodContext& context, + const string& xid) { - //TODO + uint32_t timeout = broker.getDtxManager().getTimeout(xid); + cClient.getTimeoutOk(timeout, context.getRequestId()); } -void DtxHandlerImpl::setTimeout(const MethodContext& /*context*/, +void DtxHandlerImpl::setTimeout(const MethodContext& context, u_int16_t /*ticket*/, - const string& /*xid*/, - u_int32_t /*timeout*/ ) + const string& xid, + u_int32_t timeout) { - //TODO + broker.getDtxManager().setTimeout(xid, timeout); + cClient.setTimeoutOk(context.getRequestId()); } Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxManager.cpp URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxManager.cpp?view=diff&rev=545531&r1=545530&r2=545531 ============================================================================== --- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxManager.cpp (original) +++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxManager.cpp Fri Jun 8 08:24:12 2007 @@ -19,6 +19,8 @@ * */ #include "DtxManager.h" +#include "DtxTimeout.h" +#include "qpid/log/Statement.h" #include #include using qpid::sys::Mutex; @@ -29,37 +31,52 @@ DtxManager::~DtxManager() {} -void DtxManager::start(std::string xid, DtxBuffer::shared_ptr ops) +void DtxManager::start(const std::string& xid, DtxBuffer::shared_ptr ops) { createWork(xid)->add(ops); } -void DtxManager::join(std::string xid, DtxBuffer::shared_ptr ops) +void DtxManager::join(const std::string& xid, DtxBuffer::shared_ptr ops) { getWork(xid)->add(ops); } -void DtxManager::recover(std::string xid, std::auto_ptr txn, DtxBuffer::shared_ptr ops) +void DtxManager::recover(const std::string& xid, std::auto_ptr txn, DtxBuffer::shared_ptr ops) { createWork(xid)->recover(txn, ops); } bool DtxManager::prepare(const std::string& xid) { - return getWork(xid)->prepare(); + try { + return getWork(xid)->prepare(); + } catch (DtxTimeoutException& e) { + remove(xid); + throw e; + } } bool DtxManager::commit(const std::string& xid, bool onePhase) { - bool result = getWork(xid)->commit(onePhase); - remove(xid); - return result; + try { + bool result = getWork(xid)->commit(onePhase); + remove(xid); + return result; + } catch (DtxTimeoutException& e) { + remove(xid); + throw e; + } } void DtxManager::rollback(const std::string& xid) { - getWork(xid)->rollback(); - remove(xid); + try { + getWork(xid)->rollback(); + remove(xid); + } catch (DtxTimeoutException& e) { + remove(xid); + throw e; + } } DtxManager::WorkMap::iterator DtxManager::getWork(const std::string& xid) @@ -83,7 +100,7 @@ } } -DtxManager::WorkMap::iterator DtxManager::createWork(std::string& xid) +DtxManager::WorkMap::iterator DtxManager::createWork(std::string xid) { Mutex::ScopedLock locker(lock); WorkMap::iterator i = work.find(xid); @@ -91,5 +108,50 @@ throw ConnectionException(503, boost::format("Xid %1% is already known (use 'join' to add work to an existing xid)!") % xid); } else { return work.insert(xid, new DtxWorkRecord(xid, store)).first; + } +} + +void DtxManager::setTimeout(const std::string& xid, uint32_t secs) +{ + WorkMap::iterator record = getWork(xid); + DtxTimeout::shared_ptr timeout = record->getTimeout(); + if (timeout.get()) { + if (timeout->timeout == secs) return;//no need to do anything further if timeout hasn't changed + timeout->cancelled = true; + } + timeout = DtxTimeout::shared_ptr(new DtxTimeout(secs, *this, xid)); + record->setTimeout(timeout); + timer.add(boost::static_pointer_cast(timeout)); + +} + +uint32_t DtxManager::getTimeout(const std::string& xid) +{ + DtxTimeout::shared_ptr timeout = getWork(xid)->getTimeout(); + return !timeout ? 0 : timeout->timeout; +} + +void DtxManager::timedout(const std::string& xid) +{ + Mutex::ScopedLock locker(lock); + WorkMap::iterator i = work.find(xid); + if (i == work.end()) { + QPID_LOG(warning, "Transaction timeout failed: no record for xid"); + } else { + i->timedout(); + //TODO: do we want to have a timed task to cleanup, or can we rely on an explicit completion? + //timer.add(TimerTask::shared_ptr(new DtxCleanup(60*30/*30 mins*/, *this, xid))); + } +} + +DtxManager::DtxCleanup::DtxCleanup(uint32_t _timeout, DtxManager& _mgr, const std::string& _xid) + : TimerTask(qpid::sys::Duration(_timeout * qpid::sys::TIME_SEC)), mgr(_mgr), xid(_xid) {} + +void DtxManager::DtxCleanup::fire() +{ + try { + mgr.remove(xid); + } catch (ConnectionException& e) { + //assume it was explicitly cleaned up after a call to prepare, commit or rollback } } Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxManager.h URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxManager.h?view=diff&rev=545531&r1=545530&r2=545531 ============================================================================== --- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxManager.h (original) +++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxManager.h Fri Jun 8 08:24:12 2007 @@ -24,6 +24,7 @@ #include #include "DtxBuffer.h" #include "DtxWorkRecord.h" +#include "Timer.h" #include "TransactionalStore.h" #include "qpid/framing/amqp_types.h" #include "qpid/sys/Mutex.h" @@ -34,23 +35,37 @@ class DtxManager{ typedef boost::ptr_map WorkMap; + + struct DtxCleanup : public TimerTask + { + DtxManager& mgr; + const std::string& xid; + + DtxCleanup(uint32_t timeout, DtxManager& mgr, const std::string& xid); + void fire(); + }; + WorkMap work; TransactionalStore* const store; qpid::sys::Mutex lock; + Timer timer; void remove(const std::string& xid); WorkMap::iterator getWork(const std::string& xid); - WorkMap::iterator createWork(std::string& xid); + WorkMap::iterator createWork(std::string xid); public: DtxManager(TransactionalStore* const store); ~DtxManager(); - void start(std::string xid, DtxBuffer::shared_ptr work); - void join(std::string xid, DtxBuffer::shared_ptr work); - void recover(std::string xid, std::auto_ptr txn, DtxBuffer::shared_ptr work); + void start(const std::string& xid, DtxBuffer::shared_ptr work); + void join(const std::string& xid, DtxBuffer::shared_ptr work); + void recover(const std::string& xid, std::auto_ptr txn, DtxBuffer::shared_ptr work); bool prepare(const std::string& xid); bool commit(const std::string& xid, bool onePhase); void rollback(const std::string& xid); + void setTimeout(const std::string& xid, uint32_t secs); + uint32_t getTimeout(const std::string& xid); + void timedout(const std::string& xid); }; } Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxTimeout.cpp URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxTimeout.cpp?view=auto&rev=545531 ============================================================================== --- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxTimeout.cpp (added) +++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxTimeout.cpp Fri Jun 8 08:24:12 2007 @@ -0,0 +1,35 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "DtxTimeout.h" +#include "DtxManager.h" +#include "qpid/sys/Time.h" + +using namespace qpid::broker; + +DtxTimeout::DtxTimeout(uint32_t _timeout, DtxManager& _mgr, const std::string& _xid) + : TimerTask(qpid::sys::Duration(_timeout * qpid::sys::TIME_SEC)), timeout(_timeout), mgr(_mgr), xid(_xid) +{ +} + +void DtxTimeout::fire() +{ + mgr.timedout(xid); +} Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxTimeout.cpp ------------------------------------------------------------------------------ svn:eol-style = native Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxTimeout.cpp ------------------------------------------------------------------------------ svn:keywords = Rev Date Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxTimeout.h URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxTimeout.h?view=auto&rev=545531 ============================================================================== --- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxTimeout.h (added) +++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxTimeout.h Fri Jun 8 08:24:12 2007 @@ -0,0 +1,54 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _DtxTimeout_ +#define _DtxTimeout_ + +#include "qpid/Exception.h" +#include "Timer.h" + +namespace qpid { +namespace broker { + +class DtxManager; + + +struct DtxTimeoutException : public Exception +{ + DtxTimeoutException() {} +}; + + +struct DtxTimeout : public TimerTask +{ + typedef boost::shared_ptr shared_ptr; + const uint32_t timeout; + DtxManager& mgr; + const std::string xid; + + DtxTimeout(uint32_t timeout, DtxManager& mgr, const std::string& xid); + void fire(); +}; + +} +} + + +#endif Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxTimeout.h ------------------------------------------------------------------------------ svn:eol-style = native Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxTimeout.h ------------------------------------------------------------------------------ svn:keywords = Rev Date Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxWorkRecord.cpp URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxWorkRecord.cpp?view=diff&rev=545531&r1=545530&r2=545531 ============================================================================== --- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxWorkRecord.cpp (original) +++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxWorkRecord.cpp Fri Jun 8 08:24:12 2007 @@ -27,9 +27,14 @@ using namespace qpid::broker; DtxWorkRecord::DtxWorkRecord(const std::string& _xid, TransactionalStore* const _store) : - xid(_xid), store(_store), completed(false), rolledback(false), prepared(false) {} + xid(_xid), store(_store), completed(false), rolledback(false), prepared(false), expired(false) {} -DtxWorkRecord::~DtxWorkRecord() {} +DtxWorkRecord::~DtxWorkRecord() +{ + if (timeout.get()) { + timeout->cancelled = true; + } +} bool DtxWorkRecord::prepare() { @@ -110,6 +115,9 @@ void DtxWorkRecord::add(DtxBuffer::shared_ptr ops) { Mutex::ScopedLock locker(lock); + if (expired) { + throw DtxTimeoutException(); + } if (completed) { throw ConnectionException(503, boost::format("Branch with xid %1% has been completed!") % xid); } @@ -118,6 +126,9 @@ bool DtxWorkRecord::check() { + if (expired) { + throw DtxTimeoutException(); + } if (!completed) { //iterate through all DtxBuffers and ensure they are all ended for (Work::iterator i = work.begin(); i != work.end(); i++) { @@ -148,4 +159,19 @@ ops->markEnded(); completed = true; prepared = true; +} + +void DtxWorkRecord::timedout() +{ + Mutex::ScopedLock locker(lock); + expired = true; + rolledback = true; + if (!completed) { + for (Work::iterator i = work.begin(); i != work.end(); i++) { + if (!(*i)->isEnded()) { + (*i)->timedout(); + } + } + } + abort(); } Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxWorkRecord.h URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxWorkRecord.h?view=diff&rev=545531&r1=545530&r2=545531 ============================================================================== --- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxWorkRecord.h (original) +++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/DtxWorkRecord.h Fri Jun 8 08:24:12 2007 @@ -25,6 +25,7 @@ #include #include #include "DtxBuffer.h" +#include "DtxTimeout.h" #include "TransactionalStore.h" #include "qpid/framing/amqp_types.h" #include "qpid/sys/Mutex.h" @@ -46,6 +47,8 @@ bool completed; bool rolledback; bool prepared; + bool expired; + DtxTimeout::shared_ptr timeout; Work work; std::auto_ptr txn; qpid::sys::Mutex lock; @@ -61,6 +64,9 @@ void rollback(); void add(DtxBuffer::shared_ptr ops); void recover(std::auto_ptr txn, DtxBuffer::shared_ptr ops); + void timedout(); + void setTimeout(DtxTimeout::shared_ptr t) { timeout = t; } + DtxTimeout::shared_ptr getTimeout() { return timeout; } }; } Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Timer.cpp URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Timer.cpp?view=auto&rev=545531 ============================================================================== --- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Timer.cpp (added) +++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Timer.cpp Fri Jun 8 08:24:12 2007 @@ -0,0 +1,100 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "Timer.h" +#include + +using qpid::sys::AbsTime; +using qpid::sys::Duration; +using qpid::sys::Monitor; +using qpid::sys::Thread; +using namespace qpid::broker; + +TimerTask::TimerTask(Duration timeout) : time(AbsTime::now(), timeout), cancelled(false) {} +TimerTask::TimerTask(AbsTime _time) : time(_time), cancelled(false) {} +TimerTask::~TimerTask(){} + +Timer::Timer() : active(false) +{ + start(); +} + +Timer::~Timer() +{ + stop(); +} + +void Timer::run() +{ + Monitor::ScopedLock l(monitor); + while(active){ + if (tasks.empty()) { + monitor.wait(); + } else { + TimerTask::shared_ptr t = tasks.top(); + if (t->cancelled) { + tasks.pop(); + } else if(t->time < AbsTime::now()) { + tasks.pop(); + t->fire(); + } else { + monitor.wait(t->time); + } + } + } +} + +void Timer::add(TimerTask::shared_ptr task) +{ + Monitor::ScopedLock l(monitor); + tasks.push(task); + monitor.notify(); +} + +void Timer::start() +{ + Monitor::ScopedLock l(monitor); + if (!active) { + active = true; + runner = std::auto_ptr(new Thread(this)); + } +} + +void Timer::stop() +{ + signalStop(); + if (runner.get()) { + runner->join(); + runner.reset(); + } +} +void Timer::signalStop() +{ + Monitor::ScopedLock l(monitor); + if (active) { + active = false; + monitor.notifyAll(); + } +} + +bool Later::operator()(const TimerTask::shared_ptr& a, const TimerTask::shared_ptr& b) const +{ + return a.get() && b.get() && a->time > b->time; +} Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Timer.cpp ------------------------------------------------------------------------------ svn:eol-style = native Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Timer.cpp ------------------------------------------------------------------------------ svn:keywords = Rev Date Added: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Timer.h URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Timer.h?view=auto&rev=545531 ============================================================================== --- incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Timer.h (added) +++ incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Timer.h Fri Jun 8 08:24:12 2007 @@ -0,0 +1,76 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#ifndef _Timer_ +#define _Timer_ + +#include +#include +#include +#include "qpid/sys/Monitor.h" +#include "qpid/sys/Thread.h" +#include "qpid/sys/Runnable.h" + +namespace qpid { +namespace broker { + +struct TimerTask +{ + typedef boost::shared_ptr shared_ptr; + + const qpid::sys::AbsTime time; + volatile bool cancelled; + + TimerTask(qpid::sys::Duration timeout); + TimerTask(qpid::sys::AbsTime time); + virtual ~TimerTask(); + virtual void fire() = 0; +}; + + struct Later + { + bool operator()(const TimerTask::shared_ptr& a, const TimerTask::shared_ptr& b) const; + }; + +class Timer : private qpid::sys::Runnable +{ + qpid::sys::Monitor monitor; + std::priority_queue, Later> tasks; + std::auto_ptr runner; + bool active; + + void run(); + void signalStop(); + +public: + Timer(); + ~Timer(); + + void add(TimerTask::shared_ptr task); + void start(); + void stop(); + +}; + +} +} + + +#endif Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Timer.h ------------------------------------------------------------------------------ svn:eol-style = native Propchange: incubator/qpid/trunk/qpid/cpp/src/qpid/broker/Timer.h ------------------------------------------------------------------------------ svn:keywords = Rev Date Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Time.h URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Time.h?view=diff&rev=545531&r1=545530&r2=545531 ============================================================================== --- incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Time.h (original) +++ incubator/qpid/trunk/qpid/cpp/src/qpid/sys/Time.h Fri Jun 8 08:24:12 2007 @@ -44,6 +44,9 @@ static AbsTime now(); inline static AbsTime FarFuture(); + + friend bool operator<(const AbsTime& a, const AbsTime& b); + friend bool operator>(const AbsTime& a, const AbsTime& b); }; class Duration { @@ -66,6 +69,9 @@ AbsTime AbsTime::FarFuture() { AbsTime ff; ff.time_ns = std::numeric_limits::max(); return ff;} inline AbsTime now() { return AbsTime::now(); } + +inline bool operator<(const AbsTime& a, const AbsTime& b) { return a.time_ns < b.time_ns; } +inline bool operator>(const AbsTime& a, const AbsTime& b) { return a.time_ns > b.time_ns; } Duration::Duration(int64_t time0) : nanosecs(time0) Modified: incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am?view=diff&rev=545531&r1=545530&r2=545531 ============================================================================== --- incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am (original) +++ incubator/qpid/trunk/qpid/cpp/src/tests/Makefile.am Fri Jun 8 08:24:12 2007 @@ -32,6 +32,7 @@ QueueRegistryTest \ QueueTest \ QueuePolicyTest \ + TimerTest \ TopicExchangeTest \ TxAckTest \ TxBufferTest \ Added: incubator/qpid/trunk/qpid/cpp/src/tests/TimerTest.cpp URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/TimerTest.cpp?view=auto&rev=545531 ============================================================================== --- incubator/qpid/trunk/qpid/cpp/src/tests/TimerTest.cpp (added) +++ incubator/qpid/trunk/qpid/cpp/src/tests/TimerTest.cpp Fri Jun 8 08:24:12 2007 @@ -0,0 +1,128 @@ + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/broker/Timer.h" +#include "qpid/sys/Monitor.h" +#include "qpid_test_plugin.h" +#include +#include +#include +#include + +using namespace qpid::broker; +using namespace qpid::sys; +using boost::dynamic_pointer_cast; + +class TimerTest : public CppUnit::TestCase +{ + CPPUNIT_TEST_SUITE(TimerTest); + CPPUNIT_TEST(testGeneral); + CPPUNIT_TEST_SUITE_END(); + + class Counter + { + Mutex lock; + uint counter; + public: + Counter() : counter(0) {} + uint next() + { + Mutex::ScopedLock l(lock); + return ++counter; + } + }; + + class TestTask : public TimerTask + { + const AbsTime start; + const Duration expected; + AbsTime end; + bool fired; + uint position; + Monitor monitor; + Counter& counter; + + public: + TestTask(Duration timeout, Counter& _counter) + : TimerTask(timeout), start(now()), expected(timeout), end(start), fired(false), counter(_counter) {} + + void fire() + { + Monitor::ScopedLock l(monitor); + fired = true; + position = counter.next(); + end = now(); + monitor.notify(); + } + + void check(uint expected_position, uint64_t tolerance = 500 * TIME_MSEC) + { + Monitor::ScopedLock l(monitor); + CPPUNIT_ASSERT(fired); + CPPUNIT_ASSERT_EQUAL(expected_position, position); + Duration actual(start, end); + uint64_t difference = abs(expected - actual); + std::string msg(boost::lexical_cast(boost::format("tolerance = %1%, difference = %2%") % tolerance % difference)); + CPPUNIT_ASSERT_MESSAGE(msg, difference < tolerance); + } + + void wait(Duration d) + { + Monitor::ScopedLock l(monitor); + monitor.wait(AbsTime(now(), d)); + } + }; + + class DummyRunner : public Runnable + { + public: + void run() {} + }; + +public: + + void testGeneral() + { + Counter counter; + Timer timer; + TestTask::shared_ptr task1(new TestTask(Duration(3 * TIME_SEC), counter)); + TestTask::shared_ptr task2(new TestTask(Duration(1 * TIME_SEC), counter)); + TestTask::shared_ptr task3(new TestTask(Duration(4 * TIME_SEC), counter)); + TestTask::shared_ptr task4(new TestTask(Duration(2 * TIME_SEC), counter)); + + timer.add(task1); + timer.add(task2); + timer.add(task3); + timer.add(task4); + + dynamic_pointer_cast(task3)->wait(Duration(6 * TIME_SEC)); + + dynamic_pointer_cast(task1)->check(3); + dynamic_pointer_cast(task2)->check(1); + dynamic_pointer_cast(task3)->check(4); + dynamic_pointer_cast(task4)->check(2); + } +}; + +// Make this test suite a plugin. +CPPUNIT_PLUGIN_IMPLEMENT(); +CPPUNIT_TEST_SUITE_REGISTRATION(TimerTest); + Propchange: incubator/qpid/trunk/qpid/cpp/src/tests/TimerTest.cpp ------------------------------------------------------------------------------ svn:eol-style = native Propchange: incubator/qpid/trunk/qpid/cpp/src/tests/TimerTest.cpp ------------------------------------------------------------------------------ svn:keywords = Rev Date Modified: incubator/qpid/trunk/qpid/python/tests_0-9/dtx.py URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/python/tests_0-9/dtx.py?view=diff&rev=545531&r1=545530&r2=545531 ============================================================================== --- incubator/qpid/trunk/qpid/python/tests_0-9/dtx.py (original) +++ incubator/qpid/trunk/qpid/python/tests_0-9/dtx.py Fri Jun 8 08:24:12 2007 @@ -21,6 +21,7 @@ from qpid.content import Content from qpid.testlib import testrunner, TestBase from struct import pack, unpack +from time import sleep class DtxTests(TestBase): """ @@ -37,6 +38,7 @@ """ XA_RBROLLBACK = 1 + XA_RBTIMEOUT = 2 XA_OK = 8 def test_simple_commit(self): @@ -450,6 +452,51 @@ self.assertEqual(self.XA_RBROLLBACK, channel1.dtx_coordination_prepare(xid=tx).flags) channel1.dtx_coordination_rollback(xid=tx) + + def test_get_timeout(self): + """ + Check that get-timeout returns the correct value, (and that a + transaction with a timeout can complete normally) + """ + channel = self.channel + tx = self.xid("dummy") + + channel.dtx_demarcation_select() + channel.dtx_demarcation_start(xid=tx) + self.assertEqual(0, channel.dtx_coordination_get_timeout(xid=tx).timeout) + channel.dtx_coordination_set_timeout(xid=tx, timeout=60) + self.assertEqual(60, channel.dtx_coordination_get_timeout(xid=tx).timeout) + self.assertEqual(self.XA_OK, channel.dtx_demarcation_end(xid=tx).flags) + self.assertEqual(self.XA_OK, channel.dtx_coordination_rollback(xid=tx).flags) + + def test_set_timeout(self): + """ + Test the timeout of a transaction results in the expected + behaviour + """ + #open new channel to allow self.channel to be used in checking te queue + channel = self.client.channel(2) + channel.channel_open() + #setup: + tx = self.xid("dummy") + channel.queue_declare(queue="queue-a", exclusive=True) + channel.queue_declare(queue="queue-b", exclusive=True) + channel.message_transfer(routing_key="queue-a", message_id="timeout", body="DtxMessage") + + channel.dtx_demarcation_select() + channel.dtx_demarcation_start(xid=tx) + self.swap(channel, "queue-a", "queue-b") + channel.dtx_coordination_set_timeout(xid=tx, timeout=2) + sleep(3) + #check that the work has been rolled back already + self.assertMessageCount(1, "queue-a") + self.assertMessageCount(0, "queue-b") + self.assertMessageId("timeout", "queue-a") + #check the correct codes are returned when we try to complete the txn + self.assertEqual(self.XA_RBTIMEOUT, channel.dtx_demarcation_end(xid=tx).flags) + self.assertEqual(self.XA_RBTIMEOUT, channel.dtx_coordination_rollback(xid=tx).flags) + + def test_recover(self): """