activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r1036616 - in /activemq/activemq-cpp/trunk/activemq-cpp/src/main: activemq/core/ActiveMQSession.cpp activemq/core/ActiveMQSession.h activemq/core/ActiveMQTransactionContext.cpp cms/XAResource.h
Date Thu, 18 Nov 2010 20:48:22 GMT
Author: tabish
Date: Thu Nov 18 20:48:21 2010
New Revision: 1036616

URL: http://svn.apache.org/viewvc?rev=1036616&view=rev
Log:
https://issues.apache.org/activemq/browse/AMQCPP-329

Fix some issues found in testing.

Modified:
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQTransactionContext.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/cms/XAResource.h

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp?rev=1036616&r1=1036615&r2=1036616&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp Thu
Nov 18 20:48:21 2010
@@ -67,6 +67,47 @@ using namespace decaf::lang;
 using namespace decaf::lang::exceptions;
 
 ////////////////////////////////////////////////////////////////////////////////
+namespace {
+
+    /**
+     * Class used to Hook a session that has been closed into the Transaction
+     * it is currently a part of.  Once the Transaction has been Committed or
+     * Rolled back this Synchronization can finish the Close of the session.
+     */
+    class CloseSynhcronization : public Synchronization {
+    private:
+
+        ActiveMQSession* session;
+
+    public:
+
+        CloseSynhcronization( ActiveMQSession* session ) {
+
+            if( session == NULL ) {
+                throw NullPointerException(
+                    __FILE__, __LINE__, "Synchronization Created with NULL Session.");
+            }
+
+            this->session = session;
+        }
+
+        virtual ~CloseSynhcronization() {}
+
+        virtual void beforeEnd() {
+        }
+
+        virtual void afterCommit() {
+            session->doClose();
+        }
+
+        virtual void afterRollback() {
+            session->doClose();
+        }
+
+    };
+}
+
+////////////////////////////////////////////////////////////////////////////////
 ActiveMQSession::ActiveMQSession( ActiveMQConnection* connection,
                                   const Pointer<SessionId>& id,
                                   cms::Session::AcknowledgeMode ackMode,
@@ -86,6 +127,7 @@ ActiveMQSession::ActiveMQSession( Active
 
     this->connection = connection;
     this->closed = false;
+    this->synchronizationRegistered = false;
     this->ackMode = ackMode;
     this->lastDeliveredSequenceId = -1;
 
@@ -128,6 +170,20 @@ void ActiveMQSession::close() {
         return;
     }
 
+    if( this->transaction->isInXATransaction() ) {
+
+        // TODO - Right now we don't have a safe way of dealing with this case
+        // since the session might be deleted before the XA Transaction is finalized
+        // registering a Synchronization could result in an segmentation fault.
+        //
+        // For now we just close badly and throw an exception.
+        doClose();
+
+        throw UnsupportedOperationException(
+            __FILE__, __LINE__,
+            "The Consumer is still in an Active XA Transaction, commit it first." );
+    }
+
     try {
         doClose();
     }
@@ -182,7 +238,7 @@ void ActiveMQSession::dispose() {
 
         // Roll Back the transaction since we were closed without an explicit call
         // to commit it.
-        if( this->transaction.get() != NULL && this->transaction->isInTransaction()
){
+        if( this->transaction->isInTransaction() ){
             this->transaction->rollback();
         }
 
@@ -693,7 +749,7 @@ cms::Session::AcknowledgeMode ActiveMQSe
 
 ////////////////////////////////////////////////////////////////////////////////
 bool ActiveMQSession::isTransacted() const {
-    return this->ackMode == Session::SESSION_TRANSACTED;
+    return ( this->ackMode == Session::SESSION_TRANSACTED ) || this->transaction->isInXATransaction();
 }
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -726,21 +782,10 @@ void ActiveMQSession::send( cms::Message
 
         amqMessage->setMessageId( id );
 
-        if( this->getAcknowledgeMode() == cms::Session::SESSION_TRANSACTED ) {
-
-            // Ensure that a new transaction is started if this is the first message
-            // sent since the last commit.
-            doStartTransaction();
-
-            if( this->transaction.get() == NULL ) {
-                throw ActiveMQException(
-                    __FILE__, __LINE__,
-                    "ActiveMQException::send - "
-                    "Transacted Session, has no Transaction Info.");
-            }
-
-            amqMessage->setTransactionId( this->transaction->getTransactionId()
);
-        }
+        // Ensure that a new transaction is started if this is the first message
+        // sent since the last commit.
+        doStartTransaction();
+        amqMessage->setTransactionId( this->transaction->getTransactionId() );
 
         // NOTE:
         // Now we copy the message before sending, this allows the user to reuse the
@@ -928,11 +973,11 @@ void ActiveMQSession::oneway( Pointer<Co
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ActiveMQSession::syncRequest( Pointer<Command> command, unsigned int timeout )
{
+Pointer<Response> ActiveMQSession::syncRequest( Pointer<Command> command, unsigned
int timeout ) {
 
     try{
         this->checkClosed();
-        this->connection->syncRequest( command, timeout );
+        return this->connection->syncRequest( command, timeout );
     }
     AMQ_CATCH_RETHROW( activemq::exceptions::ActiveMQException )
     AMQ_CATCH_EXCEPTION_CONVERT( Exception, activemq::exceptions::ActiveMQException )
@@ -1035,11 +1080,9 @@ void ActiveMQSession::removeProducer( co
 ////////////////////////////////////////////////////////////////////////////////
 void ActiveMQSession::doStartTransaction() {
 
-    if( !this->isTransacted() ) {
-        throw ActiveMQException( __FILE__, __LINE__, "Not a Transacted Session" );
+    if( this->isTransacted() && !this->transaction->isInXATransaction()
) {
+        this->transaction->begin();
     }
-
-    this->transaction->begin();
 }
 
 ////////////////////////////////////////////////////////////////////////////////

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.h?rev=1036616&r1=1036615&r2=1036616&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.h Thu
Nov 18 20:48:21 2010
@@ -25,6 +25,7 @@
 #include <activemq/exceptions/ActiveMQException.h>
 #include <activemq/core/ActiveMQTransactionContext.h>
 #include <activemq/commands/ActiveMQTempDestination.h>
+#include <activemq/commands/Response.h>
 #include <activemq/commands/SessionInfo.h>
 #include <activemq/commands/ConsumerInfo.h>
 #include <activemq/commands/ConsumerId.h>
@@ -90,6 +91,11 @@ namespace core{
         bool closed;
 
         /**
+         * Bool to indicate if the Session has added a Syncronization to a TransactionContext.
+         */
+        bool synchronizationRegistered;
+
+        /**
          * Map of consumers.
          */
         ConsumersMap consumers;
@@ -329,22 +335,31 @@ namespace core{
         }
 
         /**
-         * Sends a oneway message.
-         * @param command The message to send.
-         * @throws ActiveMQException if not currently connected, or
-         * if the operation fails for any reason.
+         * Sends a Command to the broker without requesting any Response be returned.
+         * .
+         * @param command
+         *      The message to send to the Broker.
+         *
+         * @throws ActiveMQException if not currently connected, or if the
+         *         operation fails for any reason.
          */
         void oneway( Pointer<commands::Command> command );
 
         /**
          * Sends a synchronous request and returns the response from the broker.
          * Converts any error responses into an exception.
-         * @param command The request command.
-         * @param timeout The time to wait for a response, default is zero or infinite.
+         *
+         * @param command
+         *      The command to send to the broker.
+         * @param timeout
+         *      The time to wait for a response, default is zero or infinite.
+         *
+         * @returns Pointer to a Response object that the broker has returned for the Command
sent.
+         *
          * @throws ActiveMQException thrown if an error response was received
-         * from the broker, or if any other error occurred.
+         *         from the broker, or if any other error occurred.
          */
-        void syncRequest( Pointer<commands::Command> command, unsigned int timeout
= 0 );
+        Pointer<commands::Response> syncRequest( Pointer<commands::Command> command,
unsigned int timeout = 0 );
 
         /**
          * Adds a MessageConsumer to this session registering it with the Connection and
store

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQTransactionContext.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQTransactionContext.cpp?rev=1036616&r1=1036615&r2=1036616&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQTransactionContext.cpp
(original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQTransactionContext.cpp
Thu Nov 18 20:48:21 2010
@@ -18,6 +18,7 @@
 
 #include <cms/Xid.h>
 #include <cms/XAException.h>
+#include <cms/TransactionInProgressException.h>
 #include <activemq/core/ActiveMQSession.h>
 #include <activemq/core/ActiveMQConnection.h>
 #include <activemq/core/ActiveMQConstants.h>
@@ -62,11 +63,6 @@ namespace core{
         Pointer<Xid> associatedXid;
         int beforeEndIndex;
 
-        // Global collection of all Ended XA Transactions.
-//        static ConcurrentStlMap< Pointer<TransactionId>,
-//                                 StlList<Synchronization*>,
-//                                 TransactionId::COMPARATOR >* ENDED_XA_TRANSACTION_CONTEXTS;
-
         TxContextData() {
         }
 
@@ -148,6 +144,11 @@ void ActiveMQTransactionContext::begin()
 
     try{
 
+        if( isInXATransaction() ) {
+            throw cms::TransactionInProgressException(
+                "Cannot start a local transaction while an XA Transaction is in progress.");
+        }
+
         if( !isInTransaction() ) {
 
             synchronized( &this->synchronizations ) {
@@ -180,6 +181,11 @@ void ActiveMQTransactionContext::commit(
 
     try{
 
+        if( isInXATransaction() ) {
+            throw cms::TransactionInProgressException(
+                "Cannot Commit a local transaction while an XA Transaction is in progress.");
+        }
+
         if( this->context->transactionId.get() == NULL ) {
             throw InvalidStateException(
                 __FILE__, __LINE__,
@@ -213,6 +219,11 @@ void ActiveMQTransactionContext::rollbac
 
     try{
 
+        if( isInXATransaction() ) {
+            throw cms::TransactionInProgressException(
+                "Cannot Rollback a local transaction while an XA Transaction is in progress.");
+        }
+
         if( this->context->transactionId == NULL ) {
             throw InvalidStateException(
                 __FILE__, __LINE__,
@@ -425,13 +436,6 @@ int ActiveMQTransactionContext::prepare(
         if( XAResource::XA_RDONLY == intResponse->getResult() ) {
 
             // transaction stops now, may be syncs that need a callback
-//            StlList<TransactionContext> l = this->context->ENDED_XA_TRANSACTION_CONTEXTS.remove(x);
-//            if (l != NULL && !l.isEmpty()) {
-//                for (TransactionContext ctx : l) {
-//                    ctx.afterCommit();
-//                }
-//            }
-
             this->afterCommit();
         }
 
@@ -447,15 +451,6 @@ int ActiveMQTransactionContext::prepare(
         throw toXAException( e );
 
     } catch( CMSException& e ) {
-//        List<TransactionContext> l = ENDED_XA_TRANSACTION_CONTEXTS.remove(x);
-//        if (l != NULL && !l.isEmpty()) {
-//            for (TransactionContext ctx : l) {
-//                try {
-//                    ctx.afterRollback();
-//                } catch (Throwable ignored) {
-//                }
-//            }
-//        }
 
         try{
             this->afterRollback();
@@ -495,13 +490,6 @@ void ActiveMQTransactionContext::commit(
 
         this->connection->syncRequest( info );
 
-//        List<TransactionContext> l = ENDED_XA_TRANSACTION_CONTEXTS.remove(x);
-//        if (l != NULL && !l.isEmpty()) {
-//            for (TransactionContext ctx : l) {
-//                ctx.afterCommit();
-//            }
-//        }
-
         this->afterCommit();
 
     } catch( Exception& ex ) {
@@ -514,15 +502,6 @@ void ActiveMQTransactionContext::commit(
         throw toXAException( ex );
 
     } catch( CMSException& e ) {
-//        List<TransactionContext> l = ENDED_XA_TRANSACTION_CONTEXTS.remove(x);
-//        if (l != NULL && !l.isEmpty()) {
-//            for (TransactionContext ctx : l) {
-//                try {
-//                    ctx.afterRollback();
-//                } catch(...) {
-//                }
-//            }
-//        }
 
         try {
             this->afterRollback();
@@ -562,13 +541,6 @@ void ActiveMQTransactionContext::rollbac
 
         this->connection->syncRequest( info );
 
-//        List<TransactionContext> l = ENDED_XA_TRANSACTION_CONTEXTS.remove(x);
-//        if (l != NULL && !l.isEmpty()) {
-//            for (TransactionContext ctx : l) {
-//                ctx.afterRollback();
-//            }
-//        }
-
         this->afterRollback();
 
     } catch( Exception& ex ) {

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/cms/XAResource.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/cms/XAResource.h?rev=1036616&r1=1036615&r2=1036616&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/cms/XAResource.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/cms/XAResource.h Thu Nov 18 20:48:21
2010
@@ -141,8 +141,8 @@ namespace cms {
          *      previously been used as the parameter to a start.
          *      method.
          * @param flags
-         *      a flags integer - one of: XAResource.TMSUCCESS,
-         *      XAResource.TMFAIL, or XAResource.TMSUSPEND.
+         *      a flags integer - one of: XAResource::TMSUCCESS,
+         *      XAResource::TMFAIL, or XAResource::TMSUSPEND.
          *      <p>
          *      TMSUCCESS means that this section of work completed
          *      successfully.
@@ -231,8 +231,8 @@ namespace cms {
          * states.
          *
          * @param flag
-         *      an integer. Must be one of: XAResource.TMSTARTRSCAN,
-         *      XAResource.TMENDRSCAN, XAResource.TMNOFLAGS.
+         *      an integer. Must be one of: XAResource::TMSTARTRSCAN,
+         *      XAResource::TMENDRSCAN, XAResource::TMNOFLAGS.
          *
          * @return an array of zero or more XIDs identifying the transaction
          *         branches in the prepared or heuristically completed states.
@@ -277,8 +277,8 @@ namespace cms {
          * @param xid
          *      the XID which identifies the transaction branch.
          * @param flags
-         *      an integer. Must be one of XAResource.TMNOFLAGS,
-         *      XAResource.TMJOIN, or XAResource.TMRESUME.
+         *      an integer. Must be one of XAResource::TMNOFLAGS,
+         *      XAResource::TMJOIN, or XAResource::TMRESUME.
          *      <p>
          *      TMJOIN implies that the start applies to joining a transaction
          *      previously passed to the Resource Manager.



Mime
View raw message