activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r737433 - in /activemq/activemq-cpp/trunk/src/main/activemq: commands/ core/
Date Sat, 24 Jan 2009 20:32:14 GMT
Author: tabish
Date: Sat Jan 24 20:32:13 2009
New Revision: 737433

URL: http://svn.apache.org/viewvc?rev=737433&view=rev
Log:
https://issues.apache.org/activemq/browse/AMQCPP-100

Improvements to the shutdown code.

Modified:
    activemq/activemq-cpp/trunk/src/main/activemq/commands/ActiveMQTempDestination.cpp
    activemq/activemq-cpp/trunk/src/main/activemq/commands/ActiveMQTempDestination.h
    activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConnection.cpp
    activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConnection.h
    activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConsumer.cpp
    activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConsumer.h
    activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQProducer.cpp
    activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQProducer.h
    activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQSession.cpp
    activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQSession.h

Modified: activemq/activemq-cpp/trunk/src/main/activemq/commands/ActiveMQTempDestination.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/commands/ActiveMQTempDestination.cpp?rev=737433&r1=737432&r2=737433&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/commands/ActiveMQTempDestination.cpp (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/commands/ActiveMQTempDestination.cpp Sat
Jan 24 20:32:13 2009
@@ -17,6 +17,7 @@
 #include <activemq/commands/ActiveMQTempDestination.h>
 
 #include <activemq/exceptions/ActiveMQException.h>
+#include <activemq/core/ActiveMQConnection.h>
 
 using namespace std;
 using namespace activemq;
@@ -45,7 +46,9 @@
 ////////////////////////////////////////////////////////////////////////////////
 void ActiveMQTempDestination::close() throw( cms::CMSException ) {
     try {
-        // TODO - Dispose of this Temp Dest.
+        if( this->connection != NULL ) {
+            this->connection->destroyDestination( this );
+        }
     }
     AMQ_CATCH_RETHROW( exceptions::ActiveMQException )
     AMQ_CATCHALL_THROW( exceptions::ActiveMQException )

Modified: activemq/activemq-cpp/trunk/src/main/activemq/commands/ActiveMQTempDestination.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/commands/ActiveMQTempDestination.h?rev=737433&r1=737432&r2=737433&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/commands/ActiveMQTempDestination.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/commands/ActiveMQTempDestination.h Sat Jan
24 20:32:13 2009
@@ -30,6 +30,9 @@
 #include <string>
 
 namespace activemq{
+namespace core{
+    class ActiveMQConnection;
+}
 namespace commands{
 
     class AMQCPP_API ActiveMQTempDestination : public ActiveMQDestination,
@@ -37,10 +40,10 @@
     protected:
 
         /**
-         * Connector that we call back on close to allow this resource to
+         * Connection that we call back on close to allow this resource to
          * be cleaned up correctly at this end and at the Broker End.
          */
-        // TODO - Add something to ask for a way to send a dispose
+        core::ActiveMQConnection* connection;
 
     public:
 
@@ -99,11 +102,22 @@
 
         /**
          * Closes down this Destination resulting in a call to dispose of the
-         * TempDestination resource at the Broker.
+         * TempDestination resource at the Broker.  This should only be called
+         * when the user is certain that they are finished with this destination.
+         * The TempDestination is not closed automatically on shutdown.
          * throws cms::CMSException
          */
         virtual void close() throw( cms::CMSException );
 
+        /**
+         * Sets the Parent Connection that is notified when this destination is
+         * destroyed.
+         * @param connection - The parent connection.
+         */
+        void setConnection( core::ActiveMQConnection* connection ) {
+            this->connection = connection;
+        }
+
     };
 
 }}

Modified: activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConnection.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConnection.cpp?rev=737433&r1=737432&r2=737433&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConnection.cpp (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConnection.cpp Sat Jan 24 20:32:13
2009
@@ -373,7 +373,7 @@
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConnection::destroyDestination( const cms::Destination* destination )
+void ActiveMQConnection::destroyDestination( const commands::ActiveMQDestination* destination
)
     throw( decaf::lang::exceptions::NullPointerException,
            decaf::lang::exceptions::IllegalStateException,
            decaf::lang::exceptions::UnsupportedOperationException,
@@ -388,14 +388,11 @@
 
         enforceConnected();
 
-        const commands::ActiveMQDestination* amqDestination =
-            dynamic_cast<const commands::ActiveMQDestination*>( destination );
-
         commands::DestinationInfo command;
 
         command.setConnectionId( connectionInfo.getConnectionId()->cloneDataStructure()
);
         command.setOperationType( ActiveMQConstants::DESTINATION_REMOVE_OPERATION );
-        command.setDestination( amqDestination->cloneDataStructure() );
+        command.setDestination( destination->cloneDataStructure() );
 
         // Send the message to the broker.
         syncRequest( &command );
@@ -408,6 +405,34 @@
 }
 
 ////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConnection::destroyDestination( const cms::Destination* destination )
+    throw( decaf::lang::exceptions::NullPointerException,
+           decaf::lang::exceptions::IllegalStateException,
+           decaf::lang::exceptions::UnsupportedOperationException,
+           activemq::exceptions::ActiveMQException ) {
+
+    try{
+
+        if( destination == NULL ) {
+            throw NullPointerException(
+                __FILE__, __LINE__, "Destination passed was NULL" );
+        }
+
+        enforceConnected();
+
+        const commands::ActiveMQDestination* amqDestination =
+            dynamic_cast<const commands::ActiveMQDestination*>( destination );
+
+        this->destroyDestination( amqDestination );
+    }
+    AMQ_CATCH_RETHROW( NullPointerException )
+    AMQ_CATCH_RETHROW( IllegalStateException )
+    AMQ_CATCH_RETHROW( ActiveMQException )
+    AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
+    AMQ_CATCHALL_THROW( ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
 void ActiveMQConnection::onCommand( transport::Command* command ) {
 
     try{

Modified: activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConnection.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConnection.h?rev=737433&r1=737432&r2=737433&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConnection.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConnection.h Sat Jan 24 20:32:13
2009
@@ -196,6 +196,30 @@
          * @throws ActiveMQException
          *         If any other error occurs during the attempt to destroy the destination.
          */
+        virtual void destroyDestination( const commands::ActiveMQDestination* destination
)
+            throw( decaf::lang::exceptions::NullPointerException,
+                   decaf::lang::exceptions::IllegalStateException,
+                   decaf::lang::exceptions::UnsupportedOperationException,
+                   activemq::exceptions::ActiveMQException );
+
+        /**
+         * Requests that the Broker removes the given Destination.  Calling this
+         * method implies that the client is finished with the Destination and that
+         * no other messages will be sent or received for the given Destination.  The
+         * Broker frees all resources it has associated with this Destination.
+         *
+         * @param destination
+         *        The CMS Destination the Broker will be requested to remove.
+         *
+         * @throws NullPointerException
+         *         If the passed Destination is Null
+         * @throws IllegalStateException
+         *         If the connection is closed.
+         * @throws UnsupportedOperationException
+         *         If the wire format in use does not support this operation.
+         * @throws ActiveMQException
+         *         If any other error occurs during the attempt to destroy the destination.
+         */
         virtual void destroyDestination( const cms::Destination* destination )
             throw( decaf::lang::exceptions::NullPointerException,
                    decaf::lang::exceptions::IllegalStateException,

Modified: activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConsumer.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConsumer.cpp?rev=737433&r1=737432&r2=737433&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConsumer.cpp (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConsumer.cpp Sat Jan 24 20:32:13
2009
@@ -55,16 +55,12 @@
             "ActiveMQConsumer::ActiveMQConsumer - Init with NULL Session");
     }
 
-    // Init Producer Data
+    // Initialize Producer Data
     this->session = session;
     this->transaction = transaction;
     this->consumerInfo.reset( consumerInfo );
     this->listener = NULL;
     this->closed = false;
-
-    // TODO - How to Detect Close
-    // Listen for our resource to close
-    //this->consumerInfo->addListener( this );
 }
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -83,33 +79,18 @@
 
     try{
 
-        if( !closed ) {
+        if( !this->isClosed() ) {
+
+            // Remove this Consumer from the Connections set of Dispatchers and then
+            // remove it from the Broker.
+            this->session->disposeOf( this->getConsumerId() );
+
+            this->closed = true;
 
             // Identifies any errors encountered during shutdown.
             bool haveException = false;
             ActiveMQException error;
 
-            // TODO
-            // Close the ConsumerInfo
-//            if( !consumerInfo->isClosed() ) {
-//                try{
-//                    // We don't want a callback now
-//                    this->consumerInfo->removeListener( this );
-//                    this->consumerInfo->close();
-//                } catch( ActiveMQException& ex ){
-//                    if( !haveException ){
-//                        ex.setMark( __FILE__, __LINE__ );
-//                        error = ex;
-//                        haveException = true;
-//                    }
-//                }
-//            }
-
-            // Remove from Broker.
-            this->session->getConnection()->disposeOf( this->consumerInfo->getConsumerId()
);
-
-            closed = true;
-
             // Purge all the pending messages
             try{
                 purgeMessages();
@@ -563,7 +544,7 @@
 
 ////////////////////////////////////////////////////////////////////////////////
 void ActiveMQConsumer::checkClosed() throw( exceptions::ActiveMQException ) {
-    if( closed ) {
+    if( this->isClosed() ) {
         throw ActiveMQException(
             __FILE__, __LINE__,
             "ActiveMQConsumer - Consumer Already Closed" );

Modified: activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConsumer.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConsumer.h?rev=737433&r1=737432&r2=737433&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConsumer.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConsumer.h Sat Jan 24 20:32:13
2009
@@ -182,17 +182,36 @@
          * Get the Consumer information for this consumer
          * @return Pointer to a Consumer Info Object
          */
-        virtual commands::ConsumerInfo* getConsumerInfo() {
+        commands::ConsumerInfo* getConsumerInfo() {
             return consumerInfo.get();
         }
 
+        /**
+         * Get the Consumer Id for this consumer
+         * @return Pointer to a Consumer Id Object
+         */
+        commands::ConsumerId* getConsumerId() {
+            if( this->isClosed() ) {
+                return NULL;
+            }
+
+            return consumerInfo->getConsumerId();
+        }
+
+        /**
+         * @returns if this Consumer has been closed.
+         */
+        bool isClosed() const {
+            return this->closed;
+        }
+
     protected:
 
         /**
          * Purges all messages currently in the queue.  This can be as a
          * result of a rollback, or of the consumer being shutdown.
          */
-        virtual void purgeMessages() throw (exceptions::ActiveMQException);
+        void purgeMessages() throw ( exceptions::ActiveMQException );
 
         /**
          * Used by synchronous receive methods to wait for messages to come in.
@@ -213,14 +232,14 @@
          * Pre-consume processing
          * @param message - the message being consumed.
          */
-        virtual void beforeMessageIsConsumed( ActiveMQMessage* message );
+        void beforeMessageIsConsumed( ActiveMQMessage* message );
 
         /**
          * Post-consume processing
          * @param message - the consumed message
          * @param messageExpired - flag indicating if the message has expired.
          */
-        virtual void afterMessageIsConsumed( ActiveMQMessage* message, bool messageExpired
);
+        void afterMessageIsConsumed( ActiveMQMessage* message, bool messageExpired );
 
     private:
 
@@ -233,7 +252,7 @@
          * for a server round-trip in that instance.
          * @param timeout - the time that the client is willing to wait.
          */
-        virtual void sendPullRequest( long long timeout )
+        void sendPullRequest( long long timeout )
             throw ( exceptions::ActiveMQException );
 
         // Checks for the closed state and throws if so.

Modified: activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQProducer.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQProducer.cpp?rev=737433&r1=737432&r2=737433&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQProducer.cpp (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQProducer.cpp Sat Jan 24 20:32:13
2009
@@ -54,10 +54,6 @@
     this->disableMessageId = false;
     this->defaultPriority = 4;
     this->defaultTimeToLive = 0;
-
-    // TODO - How to manage resources
-    // Listen for our resource to close
-    //this->producerInfo->addListener( this );
 }
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -74,10 +70,10 @@
 
     try{
 
-        if( !closed ) {
+        if( !this->isClosed() ) {
 
-            this->session->getConnection()->disposeOf( this->producerInfo->getProducerId()
);
-            closed = true;
+            this->session->disposeOf( this->producerInfo->getProducerId() );
+            this->closed = true;
         }
     }
     AMQ_CATCH_RETHROW( ActiveMQException )
@@ -154,7 +150,7 @@
 
     try {
 
-        checkClosed();
+        this->checkClosed();
 
         if( destination == NULL ) {
 
@@ -189,30 +185,6 @@
     AMQ_CATCHALL_THROW( ActiveMQException )
 }
 
-// TODO
-////////////////////////////////////////////////////////////////////////////////
-//void ActiveMQProducer::onConnectorResourceClosed(
-//    const ConnectorResource* resource ) throw ( cms::CMSException ) {
-//
-//    try{
-//
-//        checkClosed();
-//
-//        if( resource != producerInfo ) {
-//            throw ActiveMQException(
-//                __FILE__, __LINE__,
-//                "ActiveMQProducer::onConnectorResourceClosed - "
-//                "Unknown object passed to this callback");
-//        }
-//
-//        // If our producer isn't closed already, then lets close
-//        this->close();
-//    }
-//    AMQ_CATCH_RETHROW( ActiveMQException )
-//    AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
-//    AMQ_CATCHALL_THROW( ActiveMQException )
-//}
-
 ////////////////////////////////////////////////////////////////////////////////
 void ActiveMQProducer::onProducerAck( const commands::ProducerAck& ack ) {
 

Modified: activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQProducer.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQProducer.h?rev=737433&r1=737432&r2=737433&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQProducer.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQProducer.h Sat Jan 24 20:32:13
2009
@@ -243,6 +243,13 @@
     public:
 
         /**
+         * @returns true if this Producer has been closed.
+         */
+        bool isClosed() const {
+            return this->closed;
+        }
+
+        /**
          * Retries this object ProducerInfo pointer
          * @return ProducerInfo pointer
          */
@@ -251,6 +258,18 @@
         }
 
         /**
+         * Retries this object ProducerId or NULL if closed.
+         * @return ProducerId pointer
+         */
+        virtual commands::ProducerId* getProducerId(){
+            if( this->isClosed() ) {
+                return NULL;
+            }
+
+            return this->producerInfo->getProducerId();
+        }
+
+        /**
          * Handles the work of Processing a ProducerAck Command from the Broker.
          * @param ack - The ProducerAck message received from the Broker.
          */

Modified: activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQSession.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQSession.cpp?rev=737433&r1=737432&r2=737433&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQSession.cpp (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQSession.cpp Sat Jan 24 20:32:13
2009
@@ -117,22 +117,33 @@
         // Stop the dispatch executor.
         stop();
 
-        // TODO
-        // Get the complete list of closeable session resources.
-        // Get the complete list of closeable session resources.
-//        synchronized( &closableSessionResources ) {
-//
-//            Iterator<cms::Closeable*>* iter = closableSessionResources.iterator();
-//            while( iter->hasNext() ) {
-//                cms::Closeable* resource = iter->next();
-//                try{
-//                    resource->close();
-//                } catch( cms::CMSException& ex ){
-//                    /* Absorb */
-//                }
-//            }
-//            delete iter;
-//        }
+        // Close all Consumers
+        synchronized( &this->consumers ) {
+
+            std::vector<ActiveMQConsumer*> closables = this->consumers.getValues();
+
+            for( std::size_t i = 0; i < closables.size(); ++i ) {
+                try{
+                    closables[i]->close();
+                } catch( cms::CMSException& ex ){
+                    /* Absorb */
+                }
+            }
+        }
+
+        // Close all Producers
+        synchronized( &this->producers ) {
+
+            std::vector<ActiveMQProducer*> closables = this->producers.getValues();
+
+            for( std::size_t i = 0; i < closables.size(); ++i ) {
+                try{
+                    closables[i]->close();
+                } catch( cms::CMSException& ex ){
+                    /* Absorb */
+                }
+            }
+        }
 
         // TODO = Commit it first.
         // Destroy the Transaction
@@ -374,6 +385,9 @@
                 producer->getProducerInfo()->getProducerId()->getValue(), producer.get()
);
         }
 
+        // Add to the Connections list
+        this->connection->addProducer( producer.get() );
+
         return producer.release();
     }
     AMQ_CATCH_RETHROW( ActiveMQException )
@@ -624,86 +638,6 @@
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-//void ActiveMQSession::onConnectorResourceClosed(
-//    const ConnectorResource* resource ) throw ( cms::CMSException ) {
-//
-//    try{
-//
-//        if( closed ) {
-//            throw ActiveMQException(
-//                __FILE__, __LINE__,
-//                "ActiveMQSession::onProducerClose - Session Already Closed");
-//        }
-//
-//        const ConsumerInfo* consumer =
-//            dynamic_cast<const ConsumerInfo*>( resource );
-//
-//        if( consumer != NULL ) {
-//
-//            // If the executor thread is currently running, stop it.
-//            bool wasStarted = isStarted();
-//            if( wasStarted ) {
-//                stop();
-//            }
-//
-//            // Remove the dispatcher for the Connection
-//            connection->removeDispatcher( consumer );
-//
-//            // Remove this consumer from the Transaction if we are transacted
-//            if( transaction != NULL ) {
-//                transaction->removeFromTransaction( consumer->getConsumerId() );
-//            }
-//
-//            ActiveMQConsumer* obj = NULL;
-//            synchronized( &consumers ) {
-//
-//                if( consumers.containsKey( consumer->getConsumerId() ) ) {
-//
-//                    // Get the consumer reference
-//                    obj = consumers.getValue( consumer->getConsumerId() );
-//
-//                    // Remove this consumer from the map.
-//                    consumers.remove( consumer->getConsumerId() );
-//                }
-//            }
-//
-//            // Clean up any resources in the executor for this consumer
-//            if( obj != NULL && executor != NULL ) {
-//
-//                // Purge any pending messages for this consumer.
-//                vector<ActiveMQMessage*> messages =
-//                    executor->purgeConsumerMessages(obj);
-//
-//                // Destroy the messages.
-//                for( unsigned int ix=0; ix<messages.size(); ++ix ) {
-//                    delete messages[ix];
-//                }
-//            }
-//
-//            // If the executor thread was previously running, start it back
-//            // up.
-//            if( wasStarted ) {
-//                start();
-//            }
-//        }
-//
-//        // Remove the entry from the session resource map if it's there
-//        const cms::Closeable* closeable =
-//            dynamic_cast<const cms::Closeable*>( resource );
-//
-//        if( closeable != NULL ){
-//            synchronized( &closableSessionResources ) {
-//                closableSessionResources.remove(
-//                    const_cast<cms::Closeable*>( closeable ) );
-//            }
-//        }
-//    }
-//    AMQ_CATCH_RETHROW( ActiveMQException )
-//    AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
-//    AMQ_CATCHALL_THROW( ActiveMQException )
-//}
-
-////////////////////////////////////////////////////////////////////////////////
 cms::ExceptionListener* ActiveMQSession::getExceptionListener()
 {
     if( connection != NULL ) {
@@ -940,9 +874,8 @@
         // Send the message to the broker.
         this->connection->syncRequest( &command );
 
-        // TODO - Manage Resources
-        // Now that its setup, link it to this Connector
-        // tempDestination->setConnector( this );
+        // Now that its setup, link it to this Connection so it can be closed.
+        tempDestination->setConnection( this->connection );
     }
     AMQ_CATCH_RETHROW( activemq::exceptions::ActiveMQException )
     AMQ_CATCH_EXCEPTION_CONVERT( Exception, activemq::exceptions::ActiveMQException )
@@ -1034,3 +967,82 @@
             "ActiveMQSession - Session Already Closed" );
     }
 }
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQSession::disposeOf( commands::ConsumerId* id )
+    throw ( activemq::exceptions::ActiveMQException ) {
+
+    try{
+
+        this->checkClosed();
+
+        synchronized( &this->consumers ) {
+
+            if( this->consumers.containsKey( id->getValue() ) ) {
+
+                // If the executor thread is currently running, stop it.
+                bool wasStarted = isStarted();
+                if( wasStarted ) {
+                    stop();
+                }
+
+                ActiveMQConsumer* consumer = this->consumers.getValue( id->getValue()
);
+                this->connection->removeDispatcher( consumer->getConsumerInfo()
);
+                this->connection->disposeOf( id );
+
+                this->consumers.remove( id->getValue() );
+
+                //TODO
+//              // Remove this consumer from the Transaction if we are transacted
+//              if( transaction != NULL ) {
+//                  transaction->removeFromTransaction( consumer->getConsumerId() );
+//              }
+//
+                // Clean up any resources in the executor for this consumer
+                if( this->executor.get() != NULL ) {
+
+                    // Purge any pending messages for this consumer.
+                    vector<ActiveMQMessage*> messages =
+                        this->executor->purgeConsumerMessages( consumer );
+
+                    // Destroy the messages.
+                    for( unsigned int ix = 0; ix < messages.size(); ++ix ) {
+                        delete messages[ix];
+                    }
+                }
+
+                if( wasStarted ) {
+                    start();
+                }
+            }
+        }
+    }
+    AMQ_CATCH_RETHROW( ActiveMQException )
+    AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
+    AMQ_CATCHALL_THROW( ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQSession::disposeOf( commands::ProducerId* id )
+    throw ( activemq::exceptions::ActiveMQException ) {
+
+    try{
+
+        this->checkClosed();
+
+        synchronized( &this->producers ) {
+
+            if( this->producers.containsKey( id->getValue() ) ) {
+
+                ActiveMQProducer* producer = this->producers.getValue( id->getValue()
);
+                this->connection->removeProducer( producer );
+                this->connection->disposeOf( id );
+
+                this->producers.remove( id->getValue() );
+            }
+        }
+    }
+    AMQ_CATCH_RETHROW( ActiveMQException )
+    AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
+    AMQ_CATCHALL_THROW( ActiveMQException )
+}

Modified: activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQSession.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQSession.h?rev=737433&r1=737432&r2=737433&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQSession.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQSession.h Sat Jan 24 20:32:13
2009
@@ -26,6 +26,8 @@
 #include <activemq/commands/ActiveMQTempDestination.h>
 #include <activemq/commands/SessionInfo.h>
 #include <activemq/commands/ConsumerInfo.h>
+#include <activemq/commands/ConsumerId.h>
+#include <activemq/commands/ProducerId.h>
 #include <activemq/commands/TransactionId.h>
 #include <activemq/core/Dispatcher.h>
 
@@ -78,16 +80,11 @@
         decaf::util::Map<long long, ActiveMQConsumer*> consumers;
 
         /**
-         * Map of consumers.
+         * Map of producers.
          */
         decaf::util::Map<long long, ActiveMQProducer*> producers;
 
         /**
-         * Map of consumers.
-         */
-        decaf::util::Map<long long, commands::ActiveMQTempDestination*> tempDestinations;
-
-        /**
          * Sends incoming messages to the registered consumers.
          */
         std::auto_ptr<ActiveMQSessionExecutor> executor;
@@ -412,6 +409,22 @@
         void syncRequest( transport::Command* command, unsigned int timeout = 0 )
             throw ( activemq::exceptions::ActiveMQException );
 
+        /**
+         * Dispose of a Consumer from this session.  Removes it from the Connection
+         * and clean up any resources associated with it.
+         * @param consumerId - the Id of the Consumer to dispose.
+         */
+        void disposeOf( commands::ConsumerId* id )
+            throw ( activemq::exceptions::ActiveMQException );
+
+        /**
+         * Dispose of a Producer from this session.  Removes it from the Connection
+         * and clean up any resources associated with it.
+         * @param consumerId - the Id of the Producer to dispose.
+         */
+        void disposeOf( commands::ProducerId* id )
+            throw ( activemq::exceptions::ActiveMQException );
+
    private:
 
        // Checks for the closed state and throws if so.



Mime
View raw message