activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r1026370 - in /activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core: ActiveMQConnection.cpp ActiveMQConnection.h
Date Fri, 22 Oct 2010 15:24:40 GMT
Author: tabish
Date: Fri Oct 22 15:24:40 2010
New Revision: 1026370

URL: http://svn.apache.org/viewvc?rev=1026370&view=rev
Log:
Improved onException handling with work towards eventually allowing the ActiveMQConnection
to be pooled.  Propagates the exception that caused the Connection to fail so that ActiveMQConsumer
can detect that its Connection has failed even if it failed from an Async error.

Should also fix https://issues.apache.org/activemq/browse/AMQCPP-316

Modified:
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp?rev=1026370&r1=1026369&r2=1026370&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp
(original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp
Fri Oct 22 15:24:40 2010
@@ -370,11 +370,11 @@ void ActiveMQConnection::close() {
 
         long long lastDeliveredSequenceId = 0;
 
-        // Close all of the resources.
+        // Dispose of all the Session resources we know are still open.
         for( unsigned int ix=0; ix<allSessions.size(); ++ix ){
             ActiveMQSession* session = allSessions[ix];
             try{
-                session->close();
+                session->dispose();
 
                 lastDeliveredSequenceId =
                     Math::max( lastDeliveredSequenceId, session->getLastDeliveredSequenceId()
);
@@ -395,6 +395,45 @@ void ActiveMQConnection::close() {
 }
 
 ////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConnection::cleanup() {
+
+    try{
+
+        // Get the complete list of active sessions.
+        std::vector<ActiveMQSession*> allSessions;
+        synchronized( &activeSessions ) {
+            allSessions = activeSessions.toArray();
+        }
+
+        // Dispose of all the Session resources we know are still open.
+        for( unsigned int ix=0; ix<allSessions.size(); ++ix ){
+            ActiveMQSession* session = allSessions[ix];
+            try{
+                session->dispose();
+            } catch( cms::CMSException& ex ){
+                /* Absorb */
+            }
+        }
+
+        if( this->config->isConnectionInfoSentToBroker ) {
+            if( !transportFailed.get() && !closing.get() ) {
+                this->syncRequest( this->config->connectionInfo->createRemoveCommand()
);
+            }
+            this->config->isConnectionInfoSentToBroker = false;
+        }
+
+        if( this->config->userSpecifiedClientID ) {
+            this->config->connectionInfo->setClientId("");
+            this->config->userSpecifiedClientID = false;
+        }
+
+        this->config->clientIDSet = false;
+        this->started.set( false );
+    }
+    AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
+}
+
+////////////////////////////////////////////////////////////////////////////////
 void ActiveMQConnection::start() {
 
     try{
@@ -445,14 +484,17 @@ void ActiveMQConnection::disconnect( lon
         // Clear the listener, we don't care about async errors at this point.
         this->config->transport->setTransportListener( NULL );
 
-        // Remove our ConnectionId from the Broker
-        Pointer<RemoveInfo> command( this->config->connectionInfo->createRemoveCommand()
);
-        command->setLastDeliveredSequenceId( lastDeliveredSequenceId );
-        this->syncRequest( command, this->getCloseTimeout() );
-
-        // Send the disconnect command to the broker.
-        Pointer<ShutdownInfo> shutdown( new ShutdownInfo() );
-        oneway( shutdown );
+        if( this->config->isConnectionInfoSentToBroker ) {
+
+            // Remove our ConnectionId from the Broker
+            Pointer<RemoveInfo> command( this->config->connectionInfo->createRemoveCommand()
);
+            command->setLastDeliveredSequenceId( lastDeliveredSequenceId );
+            this->syncRequest( command, this->config->closeTimeout );
+
+            // Send the disconnect command to the broker.
+            Pointer<ShutdownInfo> shutdown( new ShutdownInfo() );
+            oneway( shutdown );
+        }
 
         // Allow the Support class to shutdown its resources, including the Transport.
         bool hasException = false;
@@ -662,12 +704,26 @@ void ActiveMQConnection::onException( co
             return;
         }
 
+        onAsyncException(ex);
+
+        // TODO This part should be done in a separate Thread.
+
         // Mark this Connection as having a Failed transport.
         this->transportFailed.set( true );
-        this->config->firstFailureError.reset( ex.clone() );
+        if( this->config->firstFailureError == NULL ) {
+            this->config->firstFailureError.reset( ex.clone() );
+        }
 
-        // Inform the user of the error.
-        fire( exceptions::ActiveMQException( ex ) );
+        // TODO - Until this fires in another thread we can't dipose of
+        //        the transport here since it could result in this method
+        //        being called again recursively.
+        try{
+            // this->config->transport->stop();
+        } catch(...) {
+        }
+
+        // Clean up the Connection resources.
+        cleanup();
 
         synchronized( &transportListeners ) {
 
@@ -685,6 +741,20 @@ void ActiveMQConnection::onException( co
 }
 
 ////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConnection::onAsyncException( const decaf::lang::Exception& ex ) {
+
+    if( !this->isClosed() || !this->closing.get() ) {
+
+        if( this->config->exceptionListener != NULL ) {
+
+            // Inform the user of the error.
+            // TODO - This should be run by another Thread, i.e. Executor
+            fire( exceptions::ActiveMQException( ex ) );
+        }
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
 void ActiveMQConnection::transportInterrupted() {
 
     this->config->transportInterruptionProcessingComplete.reset( new CountDownLatch(
(int)dispatchers.size() ) );

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h?rev=1026370&r1=1026369&r2=1026370&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h Fri
Oct 22 15:24:40 2010
@@ -615,6 +615,12 @@ namespace core{
         transport::Transport& getTransport() const;
 
         /**
+         * Clean up this connection object, reseting it back to a state that mirrors
+         * what a newly created ActiveMQConnection object has.
+         */
+        void cleanup();
+
+        /**
          * Sends a oneway message.
          * @param command The message to send.
          * @throws ConnectorException if not currently connected, or
@@ -651,6 +657,14 @@ namespace core{
          */
         decaf::lang::Exception* getFirstFailureError() const;
 
+        /**
+         * Event handler for dealing with async exceptions.
+         *
+         * @param ex
+         *      The exception that caused the error condition.
+         */
+        void onAsyncException( const decaf::lang::Exception& ex );
+
     private:
 
         // Sends a oneway disconnect message to the broker.



Mime
View raw message