activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r1026151 - in /activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core: ActiveMQConnection.cpp ActiveMQConnection.h ActiveMQConsumer.cpp ActiveMQConsumer.h ActiveMQProducer.cpp ActiveMQProducer.h ActiveMQSession.cpp ActiveMQSession.h
Date Thu, 21 Oct 2010 21:48:13 GMT
Author: tabish
Date: Thu Oct 21 21:48:12 2010
New Revision: 1026151

URL: http://svn.apache.org/viewvc?rev=1026151&view=rev
Log:
Refactor the shutdown methods in Consumer, Producer and Session to streamline closing out
resources when the parent is closed before the children, avoids unneeded message sends and
fixes a couple issues.

Modified:
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQProducer.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQProducer.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.h

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp?rev=1026151&r1=1026150&r2=1026151&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp
(original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp
Thu Oct 21 21:48:12 2010
@@ -1093,3 +1093,8 @@ bool ActiveMQConnection::isMessagePriori
 void ActiveMQConnection::setMessagePrioritySupported( bool value ) {
     this->config->messagePrioritySupported = value;
 }
+
+////////////////////////////////////////////////////////////////////////////////
+decaf::lang::Exception* ActiveMQConnection::getFirstFailureError() const {
+    return this->config->firstFailureError.get();
+}

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h?rev=1026151&r1=1026150&r2=1026151&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h Thu
Oct 21 21:48:12 2010
@@ -644,6 +644,13 @@ namespace core{
          */
         void setTransportInterruptionProcessingComplete();
 
+        /**
+         * Gets the pointer to the first exception that caused the Connection to become failed.
+         *
+         * @returns pointer to and Exception instance or NULL if none is set.
+         */
+        decaf::lang::Exception* getFirstFailureError() const;
+
     private:
 
         // Sends a oneway disconnect message to the broker.

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp?rev=1026151&r1=1026150&r2=1026151&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp Thu
Oct 21 21:48:12 2010
@@ -366,6 +366,24 @@ void ActiveMQConsumer::doClose() {
 
     try {
 
+        dispose();
+        // Remove at the Broker Side, consumer has been removed from the local
+        // Session and Connection objects so if the remote call to remove throws
+        // it is okay to propagate to the client.
+        Pointer<RemoveInfo> info( new RemoveInfo );
+        info->setObjectId( this->consumerInfo->getConsumerId() );
+        info->setLastDeliveredSequenceId( this->internal->lastDeliveredSequenceId
);
+        this->session->oneway( info );
+    }
+    AMQ_CATCH_RETHROW( ActiveMQException )
+    AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
+    AMQ_CATCHALL_THROW( ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConsumer::dispose() {
+
+    try{
         if( !this->isClosed() ) {
 
             if( !session->isTransacted() ) {
@@ -380,7 +398,7 @@ void ActiveMQConsumer::doClose() {
 
             // Purge all the pending messages
             try{
-                internal->unconsumedMessages->clear();
+                this->internal->unconsumedMessages->clear();
             } catch ( ActiveMQException& ex ){
                 if( !haveException ){
                     ex.setMark( __FILE__, __LINE__ );
@@ -390,7 +408,7 @@ void ActiveMQConsumer::doClose() {
             }
 
             // Stop and Wakeup all sync consumers.
-            internal->unconsumedMessages->close();
+            this->internal->unconsumedMessages->close();
 
             if( this->session->isIndividualAcknowledge() ) {
                 // For IndividualAck Mode we need to unlink the ack handler to remove a
@@ -406,15 +424,7 @@ void ActiveMQConsumer::doClose() {
             }
 
             // Remove this Consumer from the Connections set of Dispatchers
-            this->session->removeConsumer( this->consumerInfo->getConsumerId(),
this->internal->lastDeliveredSequenceId );
-
-            // Remove at the Broker Side, consumer has been removed from the local
-            // Session and Connection objects so if the remote call to remove throws
-            // it is okay to propagate to the client.
-            Pointer<RemoveInfo> info( new RemoveInfo );
-            info->setObjectId( this->consumerInfo->getConsumerId() );
-            info->setLastDeliveredSequenceId( this->internal->lastDeliveredSequenceId
);
-            this->session->oneway( info );
+            this->session->removeConsumer( this->consumerInfo->getConsumerId()
);
 
             // If we encountered an error, propagate it.
             if( haveException ){
@@ -650,8 +660,8 @@ void ActiveMQConsumer::beforeMessageIsCo
     if( !isAutoAcknowledgeBatch() ) {
 
         // When not in an Auto
-        synchronized( &internal->dispatchedMessages ) {
-            internal->dispatchedMessages.enqueueFront( dispatch );
+        synchronized( &this->internal->dispatchedMessages ) {
+            this->internal->dispatchedMessages.enqueueFront( dispatch );
         }
 
         if( this->session->isTransacted() ) {
@@ -666,7 +676,7 @@ void ActiveMQConsumer::afterMessageIsCon
 
     try{
 
-        if( internal->unconsumedMessages->isClosed() ) {
+        if( this->internal->unconsumedMessages->isClosed() ) {
             return;
         }
 
@@ -680,13 +690,13 @@ void ActiveMQConsumer::afterMessageIsCon
 
             if( this->internal->deliveringAcks.compareAndSet( false, true ) ) {
 
-                synchronized( &internal->dispatchedMessages ) {
-                    if( !internal->dispatchedMessages.empty() ) {
+                synchronized( &this->internal->dispatchedMessages ) {
+                    if( !this->internal->dispatchedMessages.empty() ) {
                         Pointer<MessageAck> ack = makeAckForAllDeliveredMessages(
                             ActiveMQConstants::ACK_TYPE_CONSUMED );
 
                         if( ack != NULL ) {
-                            internal->dispatchedMessages.clear();
+                            this->internal->dispatchedMessages.clear();
                             session->oneway( ack );
                         }
                     }
@@ -702,7 +712,7 @@ void ActiveMQConsumer::afterMessageIsCon
 
             bool messageUnackedByConsumer = false;
 
-            synchronized( &internal->dispatchedMessages ) {
+            synchronized( &this->internal->dispatchedMessages ) {
                 std::auto_ptr< Iterator< Pointer<MessageDispatch> > > iter(
this->internal->dispatchedMessages.iterator() );
                 while( iter->hasNext() ) {
                     if( iter->next() == message ) {
@@ -736,21 +746,21 @@ void ActiveMQConsumer::deliverAcks() {
 
             if( isAutoAcknowledgeEach() ) {
 
-                synchronized( &internal->dispatchedMessages ) {
+                synchronized( &this->internal->dispatchedMessages ) {
 
                     ack = makeAckForAllDeliveredMessages( ActiveMQConstants::ACK_TYPE_CONSUMED
);
 
                     if( ack != NULL ) {
-                        internal->dispatchedMessages.clear();
+                        this->internal->dispatchedMessages.clear();
                     } else {
                         ack.swap( internal->pendingAck );
                     }
                 }
 
-            } else if( internal->pendingAck != NULL &&
-                       internal->pendingAck->getAckType() == ActiveMQConstants::ACK_TYPE_CONSUMED
) {
+            } else if( this->internal->pendingAck != NULL &&
+                       this->internal->pendingAck->getAckType() == ActiveMQConstants::ACK_TYPE_CONSUMED
) {
 
-                ack.swap( internal->pendingAck );
+                ack.swap( this->internal->pendingAck );
             }
 
             if( ack != NULL ) {
@@ -776,8 +786,8 @@ void ActiveMQConsumer::ackLater( const P
     // consumer got the message to expand the pre-fetch window
     if( session->isTransacted() ) {
         session->doStartTransaction();
-        if( !internal->synchronizationRegistered ) {
-            internal->synchronizationRegistered = true;
+        if( !this->internal->synchronizationRegistered ) {
+            this->internal->synchronizationRegistered = true;
 
             Pointer<Synchronization> sync( new TransactionSynhcronization( this ) );
             this->session->getTransactionContext()->addSynchronization( sync );
@@ -786,20 +796,20 @@ void ActiveMQConsumer::ackLater( const P
 
     // The delivered message list is only needed for the recover method
     // which is only used with client ack.
-    internal->deliveredCounter++;
+    this->internal->deliveredCounter++;
 
-    Pointer<MessageAck> oldPendingAck = internal->pendingAck;
-    internal->pendingAck.reset( new MessageAck() );
-    internal->pendingAck->setConsumerId( dispatch->getConsumerId() );
-    internal->pendingAck->setAckType( (unsigned char)ackType );
-    internal->pendingAck->setDestination( dispatch->getDestination() );
-    internal->pendingAck->setLastMessageId( dispatch->getMessage()->getMessageId()
);
-    internal->pendingAck->setMessageCount( internal->deliveredCounter );
+    Pointer<MessageAck> oldPendingAck = this->internal->pendingAck;
+    this->internal->pendingAck.reset( new MessageAck() );
+    this->internal->pendingAck->setConsumerId( dispatch->getConsumerId() );
+    this->internal->pendingAck->setAckType( (unsigned char)ackType );
+    this->internal->pendingAck->setDestination( dispatch->getDestination() );
+    this->internal->pendingAck->setLastMessageId( dispatch->getMessage()->getMessageId()
);
+    this->internal->pendingAck->setMessageCount( internal->deliveredCounter );
 
     if( oldPendingAck == NULL ) {
-        internal->pendingAck->setFirstMessageId( internal->pendingAck->getLastMessageId()
);
-    } else if ( oldPendingAck->getAckType() == internal->pendingAck->getAckType()
) {
-        internal->pendingAck->setFirstMessageId( oldPendingAck->getFirstMessageId()
);
+        this->internal->pendingAck->setFirstMessageId( this->internal->pendingAck->getLastMessageId()
);
+    } else if ( oldPendingAck->getAckType() == this->internal->pendingAck->getAckType()
) {
+        this->internal->pendingAck->setFirstMessageId( oldPendingAck->getFirstMessageId()
);
     } else {
         // old pending ack being superseded by ack of another type, if is is not a delivered
         // ack and hence important, send it now so it is not lost.
@@ -809,33 +819,33 @@ void ActiveMQConsumer::ackLater( const P
     }
 
     if( session->isTransacted() ) {
-        internal->pendingAck->setTransactionId( this->session->getTransactionContext()->getTransactionId()
);
+        this->internal->pendingAck->setTransactionId( this->session->getTransactionContext()->getTransactionId()
);
     }
 
     if( ( 0.5 * this->consumerInfo->getPrefetchSize() ) <= ( internal->deliveredCounter
- internal->additionalWindowSize ) ) {
         session->oneway( this->internal->pendingAck );
-        internal->pendingAck.reset( NULL );
-        internal->deliveredCounter = 0;
-        internal->additionalWindowSize = 0;
+        this->internal->pendingAck.reset( NULL );
+        this->internal->deliveredCounter = 0;
+        this->internal->additionalWindowSize = 0;
     }
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 Pointer<MessageAck> ActiveMQConsumer::makeAckForAllDeliveredMessages( int type ) {
 
-    synchronized( &internal->dispatchedMessages ) {
+    synchronized( &this->internal->dispatchedMessages ) {
 
-        if( !internal->dispatchedMessages.empty() ) {
+        if( !this->internal->dispatchedMessages.empty() ) {
 
-            Pointer<MessageDispatch> dispatched = internal->dispatchedMessages.front();
+            Pointer<MessageDispatch> dispatched = this->internal->dispatchedMessages.front();
 
             Pointer<MessageAck> ack( new MessageAck() );
             ack->setAckType( (unsigned char)type );
             ack->setConsumerId( dispatched->getConsumerId() );
             ack->setDestination( dispatched->getDestination() );
-            ack->setMessageCount( (int)internal->dispatchedMessages.size() );
+            ack->setMessageCount( (int)this->internal->dispatchedMessages.size()
);
             ack->setLastMessageId( dispatched->getMessage()->getMessageId() );
-            ack->setFirstMessageId( internal->dispatchedMessages.back()->getMessage()->getMessageId()
);
+            ack->setFirstMessageId( this->internal->dispatchedMessages.back()->getMessage()->getMessageId()
);
 
             return ack;
         }
@@ -863,7 +873,7 @@ void ActiveMQConsumer::acknowledge( cons
 
             session->oneway( ack );
 
-            synchronized( &internal->dispatchedMessages ) {
+            synchronized( &this->internal->dispatchedMessages ) {
                 std::auto_ptr< Iterator< Pointer<MessageDispatch> > > iter(
this->internal->dispatchedMessages.iterator() );
                 while( iter->hasNext() ) {
                     if( iter->next() == dispatch ) {
@@ -887,7 +897,7 @@ void ActiveMQConsumer::acknowledge() {
 
     try{
 
-        synchronized( &internal->dispatchedMessages ) {
+        synchronized( &this->internal->dispatchedMessages ) {
 
             // Acknowledge all messages so far.
             Pointer<MessageAck> ack =
@@ -907,9 +917,9 @@ void ActiveMQConsumer::acknowledge() {
 
             // Adjust the counters
             this->internal->deliveredCounter =
-                Math::max( 0, internal->deliveredCounter - (int)internal->dispatchedMessages.size());
+                Math::max( 0, this->internal->deliveredCounter - (int)this->internal->dispatchedMessages.size());
             this->internal->additionalWindowSize =
-                Math::max(0, internal->additionalWindowSize - (int)internal->dispatchedMessages.size());
+                Math::max(0, this->internal->additionalWindowSize - (int)this->internal->dispatchedMessages.size());
 
             if( !session->isTransacted() ) {
                 this->internal->dispatchedMessages.clear();
@@ -931,24 +941,24 @@ void ActiveMQConsumer::commit() {
 ////////////////////////////////////////////////////////////////////////////////
 void ActiveMQConsumer::rollback() {
 
-    synchronized( internal->unconsumedMessages.get() ) {
+    synchronized( this->internal->unconsumedMessages.get() ) {
 
-        synchronized( &internal->dispatchedMessages ) {
-            if( internal->dispatchedMessages.empty() ) {
+        synchronized( &this->internal->dispatchedMessages ) {
+            if( this->internal->dispatchedMessages.empty() ) {
                 return;
             }
 
             // Only increase the redelivery delay after the first redelivery..
-            Pointer<MessageDispatch> lastMsg = internal->dispatchedMessages.front();
+            Pointer<MessageDispatch> lastMsg = this->internal->dispatchedMessages.front();
             const int currentRedeliveryCount = lastMsg->getMessage()->getRedeliveryCounter();
             if( currentRedeliveryCount > 0 ) {
-                internal->redeliveryDelay = this->internal->redeliveryPolicy->getNextRedeliveryDelay(
internal->redeliveryDelay );
+                this->internal->redeliveryDelay = this->internal->redeliveryPolicy->getNextRedeliveryDelay(
internal->redeliveryDelay );
             } else {
-                internal->redeliveryDelay = this->internal->redeliveryPolicy->getInitialRedeliveryDelay();
+                this->internal->redeliveryDelay = this->internal->redeliveryPolicy->getInitialRedeliveryDelay();
             }
 
             Pointer<MessageId> firstMsgId =
-                internal->dispatchedMessages.back()->getMessage()->getMessageId();
+                this->internal->dispatchedMessages.back()->getMessage()->getMessageId();
 
             std::auto_ptr< Iterator< Pointer<MessageDispatch> > > iter(
internal->dispatchedMessages.iterator() );
 
@@ -966,15 +976,15 @@ void ActiveMQConsumer::rollback() {
                 ack->setAckType( ActiveMQConstants::ACK_TYPE_POISON );
                 ack->setConsumerId( this->consumerInfo->getConsumerId() );
                 ack->setDestination( lastMsg->getDestination() );
-                ack->setMessageCount( (int)internal->dispatchedMessages.size() );
+                ack->setMessageCount( (int)this->internal->dispatchedMessages.size()
);
                 ack->setLastMessageId( lastMsg->getMessage()->getMessageId() );
                 ack->setFirstMessageId( firstMsgId );
 
                 session->oneway( ack );
                 // Adjust the window size.
-                internal->additionalWindowSize =
-                    Math::max( 0, internal->additionalWindowSize - (int)internal->dispatchedMessages.size()
);
-                internal->redeliveryDelay = 0;
+                this->internal->additionalWindowSize =
+                    Math::max( 0, this->internal->additionalWindowSize - (int)this->internal->dispatchedMessages.size()
);
+                this->internal->redeliveryDelay = 0;
 
             } else {
 
@@ -984,7 +994,7 @@ void ActiveMQConsumer::rollback() {
                     ack->setAckType( ActiveMQConstants::ACK_TYPE_REDELIVERED );
                     ack->setConsumerId( this->consumerInfo->getConsumerId() );
                     ack->setDestination( lastMsg->getDestination() );
-                    ack->setMessageCount( (int)internal->dispatchedMessages.size()
);
+                    ack->setMessageCount( (int)this->internal->dispatchedMessages.size()
);
                     ack->setLastMessageId( lastMsg->getMessage()->getMessageId()
);
                     ack->setFirstMessageId( firstMsgId );
 
@@ -992,15 +1002,15 @@ void ActiveMQConsumer::rollback() {
                 }
 
                 // stop the delivery of messages.
-                internal->unconsumedMessages->stop();
+                this->internal->unconsumedMessages->stop();
 
-                std::auto_ptr< Iterator< Pointer<MessageDispatch> > > iter(
internal->dispatchedMessages.iterator() );
+                std::auto_ptr< Iterator< Pointer<MessageDispatch> > > iter(
this->internal->dispatchedMessages.iterator() );
 
                 while( iter->hasNext() ) {
-                    internal->unconsumedMessages->enqueueFirst( iter->next() );
+                    this->internal->unconsumedMessages->enqueueFirst( iter->next()
);
                 }
 
-                if( internal->redeliveryDelay > 0 && !internal->unconsumedMessages->isClosed()
) {
+                if( internal->redeliveryDelay > 0 && !this->internal->unconsumedMessages->isClosed()
) {
                     // TODO
                     // Start up the delivery again a little later.
                     //scheduler.executeAfterDelay(new Runnable() {
@@ -1020,13 +1030,13 @@ void ActiveMQConsumer::rollback() {
                 }
 
             }
-            internal->deliveredCounter -= (int)internal->dispatchedMessages.size();
-            internal->dispatchedMessages.clear();
+            this->internal->deliveredCounter -= (int)internal->dispatchedMessages.size();
+            this->internal->dispatchedMessages.clear();
         }
     }
 
     if( this->internal->listener != NULL ) {
-        session->redispatch( *internal->unconsumedMessages );
+        session->redispatch( *this->internal->unconsumedMessages );
     }
 }
 
@@ -1035,17 +1045,17 @@ void ActiveMQConsumer::dispatch( const P
 
     try {
 
-        synchronized( internal->unconsumedMessages.get() ) {
+        synchronized( this->internal->unconsumedMessages.get() ) {
 
             clearMessagesInProgress();
             if( this->internal->clearDispatchList ) {
                 // we are reconnecting so lets flush the in progress
                 // messages
-                internal->clearDispatchList = false;
-                internal->unconsumedMessages->clear();
+                this->internal->clearDispatchList = false;
+                this->internal->unconsumedMessages->clear();
             }
 
-            if( !internal->unconsumedMessages->isClosed() ) {
+            if( !this->internal->unconsumedMessages->isClosed() ) {
 
                 // Don't dispatch expired messages, ack it and then destroy it
                 if( dispatch->getMessage()->isExpired() ) {
@@ -1055,7 +1065,7 @@ void ActiveMQConsumer::dispatch( const P
                     return;
                 }
 
-                synchronized( &internal->listenerMutex ) {
+                synchronized( &this->internal->listenerMutex ) {
                     // If we have a listener, send the message.
                     if( this->internal->listener != NULL && internal->unconsumedMessages->isRunning()
) {
 
@@ -1122,7 +1132,7 @@ void ActiveMQConsumer::checkClosed() con
 ////////////////////////////////////////////////////////////////////////////////
 bool ActiveMQConsumer::iterate() {
 
-    synchronized( &internal->listenerMutex ) {
+    synchronized( &this->internal->listenerMutex ) {
 
         if( this->internal->listener != NULL ) {
 

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.h?rev=1026151&r1=1026150&r2=1026151&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.h Thu
Oct 21 21:48:12 2010
@@ -146,6 +146,13 @@ namespace core{
         void doClose();
 
         /**
+         * Cleans up this objects internal resources.
+         *
+         * @throw ActiveMQException if an error occurs while performing the operation.
+         */
+        void dispose();
+
+        /**
          * Get the Consumer information for this consumer
          * @return Reference to a Consumer Info Object
          */

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQProducer.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQProducer.cpp?rev=1026151&r1=1026150&r2=1026151&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQProducer.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQProducer.cpp Thu
Oct 21 21:48:12 2010
@@ -98,8 +98,7 @@ void ActiveMQProducer::close() {
 
         if( !this->isClosed() ) {
 
-            this->session->removeProducer( this->producerInfo->getProducerId()
);
-            this->closed = true;
+            dispose();
 
             // Remove at the Broker Side, if this fails the producer has already
             // been removed from the session and connection objects so its safe
@@ -113,6 +112,15 @@ void ActiveMQProducer::close() {
 }
 
 ////////////////////////////////////////////////////////////////////////////////
+void ActiveMQProducer::dispose() {
+
+    if( !this->isClosed() ) {
+        this->session->removeProducer( this->producerInfo->getProducerId() );
+        this->closed = true;
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
 void ActiveMQProducer::send( cms::Message* message ) {
 
     try {

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQProducer.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQProducer.h?rev=1026151&r1=1026150&r2=1026151&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQProducer.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQProducer.h Thu
Oct 21 21:48:12 2010
@@ -242,6 +242,14 @@ namespace core{
          */
         virtual void onProducerAck( const commands::ProducerAck& ack );
 
+        /**
+         * Performs Producer object cleanup but doesn't attempt to send the Remove command
+         * to the broker.  Called when the parent resource if closed first to avoid the message
+         * send and avoid any exceptions that might be thrown from an attempt to send a remove
+         * command to a failed transport.
+         */
+        void dispose();
+
    private:
 
        // Checks for the closed state and throws if so.

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp?rev=1026151&r1=1026150&r2=1026151&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp Thu
Oct 21 21:48:12 2010
@@ -119,6 +119,53 @@ void ActiveMQSession::close() {
     }
 
     try {
+        doClose();
+    }
+    AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQSession::doClose() {
+
+    try {
+        dispose();
+
+        // Remove this session from the Broker.
+        Pointer<RemoveInfo> info( new RemoveInfo() );
+        info->setObjectId( this->sessionInfo->getSessionId() );
+        info->setLastDeliveredSequenceId( this->lastDeliveredSequenceId );
+        this->connection->oneway( info );
+    }
+    AMQ_CATCH_RETHROW( activemq::exceptions::ActiveMQException )
+    AMQ_CATCH_EXCEPTION_CONVERT( Exception, activemq::exceptions::ActiveMQException )
+    AMQ_CATCHALL_THROW( activemq::exceptions::ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQSession::dispose() {
+
+    class Finalizer {
+    private:
+
+        ActiveMQSession* session;
+        ActiveMQConnection* connection;
+
+    public:
+
+        Finalizer(ActiveMQSession* session, ActiveMQConnection* connection) {
+            this->session = session;
+            this->connection = connection;
+        }
+
+        ~Finalizer() {
+            this->connection->removeSession(this->session);
+            this->session->closed = true;
+        }
+    };
+
+    try{
+
+        Finalizer final(this, this->connection);
 
         // Stop the dispatch executor.
         stop();
@@ -129,47 +176,40 @@ void ActiveMQSession::close() {
             this->transaction->rollback();
         }
 
-        // Close all Consumers
+        // Dispose of all Consumers, the dispose method skips the RemoveInfo command.
         synchronized( &this->consumers ) {
 
             std::vector<ActiveMQConsumer*> closables = this->consumers.values();
 
             for( std::size_t i = 0; i < closables.size(); ++i ) {
                 try{
-                    closables[i]->close();
+                    closables[i]->setFailureError(this->connection->getFirstFailureError());
+                    closables[i]->dispose();
+                    this->lastDeliveredSequenceId =
+                        Math::max( this->lastDeliveredSequenceId, closables[i]->getLastDeliveredSequenceId()
);
                 } catch( cms::CMSException& ex ){
                     /* Absorb */
                 }
             }
         }
 
-        // Close all Producers
+        // Dispose of all Producers, the dispose method skips the RemoveInfo command.
         synchronized( &this->producers ) {
 
             std::vector<ActiveMQProducer*> closables = this->producers.values();
 
             for( std::size_t i = 0; i < closables.size(); ++i ) {
                 try{
-                    closables[i]->close();
+                    closables[i]->dispose();
                 } catch( cms::CMSException& ex ){
                     /* Absorb */
                 }
             }
         }
-
-        // Now indicate that this session is closed.
-        closed = true;
-
-        // Remove this sessions from the connection
-        this->connection->removeSession( this );
-
-        // Remove this session from the Broker.
-        Pointer<RemoveInfo> info( new RemoveInfo() );
-        info->setObjectId( this->sessionInfo->getSessionId() );
-        info->setLastDeliveredSequenceId( this->lastDeliveredSequenceId );
-        this->connection->oneway( info );
     }
-    AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
+    AMQ_CATCH_RETHROW( activemq::exceptions::ActiveMQException )
+    AMQ_CATCH_EXCEPTION_CONVERT( Exception, activemq::exceptions::ActiveMQException )
+    AMQ_CATCHALL_THROW( activemq::exceptions::ActiveMQException )
 }
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -919,7 +959,7 @@ void ActiveMQSession::addConsumer( Activ
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ActiveMQSession::removeConsumer( const Pointer<ConsumerId>& consumerId, long
long lastDeliveredSequenceId ) {
+void ActiveMQSession::removeConsumer( const Pointer<ConsumerId>& consumerId ) {
 
     try{
 
@@ -933,8 +973,6 @@ void ActiveMQSession::removeConsumer( co
                 // the Connection.
                 this->connection->removeDispatcher( consumerId );
                 this->consumers.remove( consumerId );
-                this->lastDeliveredSequenceId =
-                    Math::max( this->lastDeliveredSequenceId, lastDeliveredSequenceId
);
             }
         }
     }

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.h?rev=1026151&r1=1026150&r2=1026151&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.h Thu
Oct 21 21:48:12 2010
@@ -364,12 +364,10 @@ namespace core{
          *
          * @param consumerId
          *      The ConsumerId of the MessageConsumer to remove from this Session.
-         * @param lastDeliveredSequenceId
-         *      The sequenceId of the last Message the consumer delivered.
          *
          * @throw ActiveMQException if an internal error occurs.
          */
-        void removeConsumer( const Pointer<commands::ConsumerId>& consumerId, long
long lastDeliveredSequenceId = 0 );
+        void removeConsumer( const Pointer<commands::ConsumerId>& consumerId );
 
         /**
          * Adds a MessageProducer to this session registering it with the Connection and
store
@@ -447,6 +445,22 @@ namespace core{
          */
         Pointer<commands::ProducerId> getNextProducerId();
 
+        /**
+         * Performs the actual Session close operations.  This method is meant for use
+         * by ActiveMQConnection, the connection object calls this when it has been
+         * closed to skip some of the extraneous processing done by the client level
+         * close method.
+         */
+        void doClose();
+
+        /**
+         * Cleans up the Session object's resources without attempting to send the
+         * Remove command to the broker, this can be called from ActiveMQConnection when
+         * it knows that the transport is down and the doClose method would throw an
+         * exception when it attempt to send the Remove Command.
+         */
+        void dispose();
+
    private:
 
        /**



Mime
View raw message