Return-Path: Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: (qmail 3316 invoked from network); 18 Nov 2010 20:49:10 -0000 Received: from unknown (HELO mail.apache.org) (140.211.11.3) by 140.211.11.9 with SMTP; 18 Nov 2010 20:49:10 -0000 Received: (qmail 70637 invoked by uid 500); 18 Nov 2010 20:49:41 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 70613 invoked by uid 500); 18 Nov 2010 20:49:41 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 70606 invoked by uid 99); 18 Nov 2010 20:49:41 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 18 Nov 2010 20:49:41 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 18 Nov 2010 20:49:37 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 58CF5238890D; Thu, 18 Nov 2010 20:48:22 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1036616 - in /activemq/activemq-cpp/trunk/activemq-cpp/src/main: activemq/core/ActiveMQSession.cpp activemq/core/ActiveMQSession.h activemq/core/ActiveMQTransactionContext.cpp cms/XAResource.h Date: Thu, 18 Nov 2010 20:48:22 -0000 To: commits@activemq.apache.org From: tabish@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20101118204822.58CF5238890D@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: tabish Date: Thu Nov 18 20:48:21 2010 New Revision: 1036616 URL: http://svn.apache.org/viewvc?rev=1036616&view=rev Log: https://issues.apache.org/activemq/browse/AMQCPP-329 Fix some issues found in testing. Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.h activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQTransactionContext.cpp activemq/activemq-cpp/trunk/activemq-cpp/src/main/cms/XAResource.h Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp?rev=1036616&r1=1036615&r2=1036616&view=diff ============================================================================== --- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp (original) +++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp Thu Nov 18 20:48:21 2010 @@ -67,6 +67,47 @@ using namespace decaf::lang; using namespace decaf::lang::exceptions; //////////////////////////////////////////////////////////////////////////////// +namespace { + + /** + * Class used to Hook a session that has been closed into the Transaction + * it is currently a part of. Once the Transaction has been Committed or + * Rolled back this Synchronization can finish the Close of the session. + */ + class CloseSynhcronization : public Synchronization { + private: + + ActiveMQSession* session; + + public: + + CloseSynhcronization( ActiveMQSession* session ) { + + if( session == NULL ) { + throw NullPointerException( + __FILE__, __LINE__, "Synchronization Created with NULL Session."); + } + + this->session = session; + } + + virtual ~CloseSynhcronization() {} + + virtual void beforeEnd() { + } + + virtual void afterCommit() { + session->doClose(); + } + + virtual void afterRollback() { + session->doClose(); + } + + }; +} + +//////////////////////////////////////////////////////////////////////////////// ActiveMQSession::ActiveMQSession( ActiveMQConnection* connection, const Pointer& id, cms::Session::AcknowledgeMode ackMode, @@ -86,6 +127,7 @@ ActiveMQSession::ActiveMQSession( Active this->connection = connection; this->closed = false; + this->synchronizationRegistered = false; this->ackMode = ackMode; this->lastDeliveredSequenceId = -1; @@ -128,6 +170,20 @@ void ActiveMQSession::close() { return; } + if( this->transaction->isInXATransaction() ) { + + // TODO - Right now we don't have a safe way of dealing with this case + // since the session might be deleted before the XA Transaction is finalized + // registering a Synchronization could result in an segmentation fault. + // + // For now we just close badly and throw an exception. + doClose(); + + throw UnsupportedOperationException( + __FILE__, __LINE__, + "The Consumer is still in an Active XA Transaction, commit it first." ); + } + try { doClose(); } @@ -182,7 +238,7 @@ void ActiveMQSession::dispose() { // Roll Back the transaction since we were closed without an explicit call // to commit it. - if( this->transaction.get() != NULL && this->transaction->isInTransaction() ){ + if( this->transaction->isInTransaction() ){ this->transaction->rollback(); } @@ -693,7 +749,7 @@ cms::Session::AcknowledgeMode ActiveMQSe //////////////////////////////////////////////////////////////////////////////// bool ActiveMQSession::isTransacted() const { - return this->ackMode == Session::SESSION_TRANSACTED; + return ( this->ackMode == Session::SESSION_TRANSACTED ) || this->transaction->isInXATransaction(); } //////////////////////////////////////////////////////////////////////////////// @@ -726,21 +782,10 @@ void ActiveMQSession::send( cms::Message amqMessage->setMessageId( id ); - if( this->getAcknowledgeMode() == cms::Session::SESSION_TRANSACTED ) { - - // Ensure that a new transaction is started if this is the first message - // sent since the last commit. - doStartTransaction(); - - if( this->transaction.get() == NULL ) { - throw ActiveMQException( - __FILE__, __LINE__, - "ActiveMQException::send - " - "Transacted Session, has no Transaction Info."); - } - - amqMessage->setTransactionId( this->transaction->getTransactionId() ); - } + // Ensure that a new transaction is started if this is the first message + // sent since the last commit. + doStartTransaction(); + amqMessage->setTransactionId( this->transaction->getTransactionId() ); // NOTE: // Now we copy the message before sending, this allows the user to reuse the @@ -928,11 +973,11 @@ void ActiveMQSession::oneway( Pointer command, unsigned int timeout ) { +Pointer ActiveMQSession::syncRequest( Pointer command, unsigned int timeout ) { try{ this->checkClosed(); - this->connection->syncRequest( command, timeout ); + return this->connection->syncRequest( command, timeout ); } AMQ_CATCH_RETHROW( activemq::exceptions::ActiveMQException ) AMQ_CATCH_EXCEPTION_CONVERT( Exception, activemq::exceptions::ActiveMQException ) @@ -1035,11 +1080,9 @@ void ActiveMQSession::removeProducer( co //////////////////////////////////////////////////////////////////////////////// void ActiveMQSession::doStartTransaction() { - if( !this->isTransacted() ) { - throw ActiveMQException( __FILE__, __LINE__, "Not a Transacted Session" ); + if( this->isTransacted() && !this->transaction->isInXATransaction() ) { + this->transaction->begin(); } - - this->transaction->begin(); } //////////////////////////////////////////////////////////////////////////////// Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.h URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.h?rev=1036616&r1=1036615&r2=1036616&view=diff ============================================================================== --- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.h (original) +++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.h Thu Nov 18 20:48:21 2010 @@ -25,6 +25,7 @@ #include #include #include +#include #include #include #include @@ -90,6 +91,11 @@ namespace core{ bool closed; /** + * Bool to indicate if the Session has added a Syncronization to a TransactionContext. + */ + bool synchronizationRegistered; + + /** * Map of consumers. */ ConsumersMap consumers; @@ -329,22 +335,31 @@ namespace core{ } /** - * Sends a oneway message. - * @param command The message to send. - * @throws ActiveMQException if not currently connected, or - * if the operation fails for any reason. + * Sends a Command to the broker without requesting any Response be returned. + * . + * @param command + * The message to send to the Broker. + * + * @throws ActiveMQException if not currently connected, or if the + * operation fails for any reason. */ void oneway( Pointer command ); /** * Sends a synchronous request and returns the response from the broker. * Converts any error responses into an exception. - * @param command The request command. - * @param timeout The time to wait for a response, default is zero or infinite. + * + * @param command + * The command to send to the broker. + * @param timeout + * The time to wait for a response, default is zero or infinite. + * + * @returns Pointer to a Response object that the broker has returned for the Command sent. + * * @throws ActiveMQException thrown if an error response was received - * from the broker, or if any other error occurred. + * from the broker, or if any other error occurred. */ - void syncRequest( Pointer command, unsigned int timeout = 0 ); + Pointer syncRequest( Pointer command, unsigned int timeout = 0 ); /** * Adds a MessageConsumer to this session registering it with the Connection and store Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQTransactionContext.cpp URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQTransactionContext.cpp?rev=1036616&r1=1036615&r2=1036616&view=diff ============================================================================== --- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQTransactionContext.cpp (original) +++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQTransactionContext.cpp Thu Nov 18 20:48:21 2010 @@ -18,6 +18,7 @@ #include #include +#include #include #include #include @@ -62,11 +63,6 @@ namespace core{ Pointer associatedXid; int beforeEndIndex; - // Global collection of all Ended XA Transactions. -// static ConcurrentStlMap< Pointer, -// StlList, -// TransactionId::COMPARATOR >* ENDED_XA_TRANSACTION_CONTEXTS; - TxContextData() { } @@ -148,6 +144,11 @@ void ActiveMQTransactionContext::begin() try{ + if( isInXATransaction() ) { + throw cms::TransactionInProgressException( + "Cannot start a local transaction while an XA Transaction is in progress."); + } + if( !isInTransaction() ) { synchronized( &this->synchronizations ) { @@ -180,6 +181,11 @@ void ActiveMQTransactionContext::commit( try{ + if( isInXATransaction() ) { + throw cms::TransactionInProgressException( + "Cannot Commit a local transaction while an XA Transaction is in progress."); + } + if( this->context->transactionId.get() == NULL ) { throw InvalidStateException( __FILE__, __LINE__, @@ -213,6 +219,11 @@ void ActiveMQTransactionContext::rollbac try{ + if( isInXATransaction() ) { + throw cms::TransactionInProgressException( + "Cannot Rollback a local transaction while an XA Transaction is in progress."); + } + if( this->context->transactionId == NULL ) { throw InvalidStateException( __FILE__, __LINE__, @@ -425,13 +436,6 @@ int ActiveMQTransactionContext::prepare( if( XAResource::XA_RDONLY == intResponse->getResult() ) { // transaction stops now, may be syncs that need a callback -// StlList l = this->context->ENDED_XA_TRANSACTION_CONTEXTS.remove(x); -// if (l != NULL && !l.isEmpty()) { -// for (TransactionContext ctx : l) { -// ctx.afterCommit(); -// } -// } - this->afterCommit(); } @@ -447,15 +451,6 @@ int ActiveMQTransactionContext::prepare( throw toXAException( e ); } catch( CMSException& e ) { -// List l = ENDED_XA_TRANSACTION_CONTEXTS.remove(x); -// if (l != NULL && !l.isEmpty()) { -// for (TransactionContext ctx : l) { -// try { -// ctx.afterRollback(); -// } catch (Throwable ignored) { -// } -// } -// } try{ this->afterRollback(); @@ -495,13 +490,6 @@ void ActiveMQTransactionContext::commit( this->connection->syncRequest( info ); -// List l = ENDED_XA_TRANSACTION_CONTEXTS.remove(x); -// if (l != NULL && !l.isEmpty()) { -// for (TransactionContext ctx : l) { -// ctx.afterCommit(); -// } -// } - this->afterCommit(); } catch( Exception& ex ) { @@ -514,15 +502,6 @@ void ActiveMQTransactionContext::commit( throw toXAException( ex ); } catch( CMSException& e ) { -// List l = ENDED_XA_TRANSACTION_CONTEXTS.remove(x); -// if (l != NULL && !l.isEmpty()) { -// for (TransactionContext ctx : l) { -// try { -// ctx.afterRollback(); -// } catch(...) { -// } -// } -// } try { this->afterRollback(); @@ -562,13 +541,6 @@ void ActiveMQTransactionContext::rollbac this->connection->syncRequest( info ); -// List l = ENDED_XA_TRANSACTION_CONTEXTS.remove(x); -// if (l != NULL && !l.isEmpty()) { -// for (TransactionContext ctx : l) { -// ctx.afterRollback(); -// } -// } - this->afterRollback(); } catch( Exception& ex ) { Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/cms/XAResource.h URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/cms/XAResource.h?rev=1036616&r1=1036615&r2=1036616&view=diff ============================================================================== --- activemq/activemq-cpp/trunk/activemq-cpp/src/main/cms/XAResource.h (original) +++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/cms/XAResource.h Thu Nov 18 20:48:21 2010 @@ -141,8 +141,8 @@ namespace cms { * previously been used as the parameter to a start. * method. * @param flags - * a flags integer - one of: XAResource.TMSUCCESS, - * XAResource.TMFAIL, or XAResource.TMSUSPEND. + * a flags integer - one of: XAResource::TMSUCCESS, + * XAResource::TMFAIL, or XAResource::TMSUSPEND. *

* TMSUCCESS means that this section of work completed * successfully. @@ -231,8 +231,8 @@ namespace cms { * states. * * @param flag - * an integer. Must be one of: XAResource.TMSTARTRSCAN, - * XAResource.TMENDRSCAN, XAResource.TMNOFLAGS. + * an integer. Must be one of: XAResource::TMSTARTRSCAN, + * XAResource::TMENDRSCAN, XAResource::TMNOFLAGS. * * @return an array of zero or more XIDs identifying the transaction * branches in the prepared or heuristically completed states. @@ -277,8 +277,8 @@ namespace cms { * @param xid * the XID which identifies the transaction branch. * @param flags - * an integer. Must be one of XAResource.TMNOFLAGS, - * XAResource.TMJOIN, or XAResource.TMRESUME. + * an integer. Must be one of XAResource::TMNOFLAGS, + * XAResource::TMJOIN, or XAResource::TMRESUME. *

* TMJOIN implies that the start applies to joining a transaction * previously passed to the Resource Manager.