activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r508774 - in /activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core: ActiveMQSession.cpp ActiveMQSession.h
Date Sat, 17 Feb 2007 15:55:58 GMT
Author: tabish
Date: Sat Feb 17 07:55:57 2007
New Revision: 508774

URL: http://svn.apache.org/viewvc?view=rev&rev=508774
Log:
http://issues.apache.org/activemq/browse/AMQCPP-30

Modified:
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.h

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp?view=diff&rev=508774&r1=508773&r2=508774
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp Sat
Feb 17 07:55:57 2007
@@ -54,7 +54,7 @@
     this->connection   = connection;
     this->closed       = false;
     this->asyncThread  = NULL;
-    this->useAsyncSend = Boolean::parseBoolean( 
+    this->useAsyncSend = Boolean::parseBoolean(
         properties.getProperty( "useAsyncSend", "false" ) );
 
     // If we are in Async Send Mode we need to start the Thread
@@ -66,7 +66,7 @@
     // Create a Transaction object only if the session is transactional
     if( isTransacted() )
     {
-        transaction = 
+        transaction =
             new ActiveMQTransaction(connection, this, properties );
     }
 }
@@ -98,7 +98,7 @@
         synchronized( &closableSessionResources ) {
             allResources = closableSessionResources.toArray();
         }
-        
+
         // Close all of the resources.
         for( unsigned int ix=0; ix<allResources.size(); ++ix ){
             cms::Closeable* resource = allResources[ix];
@@ -114,14 +114,14 @@
             delete transaction;
             transaction = NULL;
         }
-                
+
         // Destroy this sessions resources
         connection->removeSession( this );
         sessionInfo = NULL;
-        
+
         // Now indicate that this session is closed.
         closed = true;
-        
+
         // Stop the Async Thread if its running
         stopThread();
 
@@ -230,17 +230,17 @@
         // Create the consumer instance.
         ActiveMQConsumer* consumer = new ActiveMQConsumer(
             connection->getConnectionData()->getConnector()->
-                createConsumer( destination, 
-                                sessionInfo, 
-                                selector, 
+                createConsumer( destination,
+                                sessionInfo,
+                                selector,
                                 noLocal ), this );
-                                
+
         // Add the consumer to the map of closeable session resources.
         synchronized( &closableSessionResources ) {
             closableSessionResources.add( consumer );
-        }   
+        }
 
-        // Register this consumer as a listener of messages from the 
+        // Register this consumer as a listener of messages from the
         // connection.
         connection->addMessageListener(
             consumer->getConsumerInfo()->getConsumerId(), consumer );
@@ -277,7 +277,7 @@
         synchronized( &closableSessionResources ) {
             closableSessionResources.add( consumer );
         }
-        
+
         // Register the consumer as a listener of messages from the
         // connection.
         connection->addMessageListener(
@@ -307,12 +307,12 @@
         ActiveMQProducer* producer = new ActiveMQProducer(
             connection->getConnectionData()->getConnector()->
                 createProducer( destination, sessionInfo ), this );
-                
+
         // Add the producer to the map of closeable session resources.
         synchronized( &closableSessionResources ) {
             closableSessionResources.add( producer );
         }
-        
+
         return producer;
     }
     AMQ_CATCH_RETHROW( ActiveMQException )
@@ -373,8 +373,22 @@
                 "Session Already Closed" );
         }
 
-        return connection->getConnectionData()->
-            getConnector()->createTemporaryQueue( sessionInfo );
+        // Create the consumer instance.
+        cms::TemporaryQueue* queue =
+            connection->getConnectionData()->
+                getConnector()->createTemporaryQueue( sessionInfo );
+
+        // Check if this object is closeable, if so we add it to our map
+        // of closeable resources so that it gets cleaned up.
+        if( dynamic_cast<cms::Closeable*>( queue ) != NULL ) {
+            // Add the consumer to the map of closeable session resources.
+            synchronized( &closableSessionResources ) {
+                closableSessionResources.add(
+                    dynamic_cast<cms::Closeable*>( queue ) );
+            }
+        }
+
+        return queue;
     }
     AMQ_CATCH_RETHROW( ActiveMQException )
     AMQ_CATCHALL_THROW( ActiveMQException )
@@ -394,15 +408,29 @@
                 "Session Already Closed" );
         }
 
-        return connection->getConnectionData()->
-            getConnector()->createTemporaryTopic( sessionInfo );
+        // Create the consumer instance.
+        cms::TemporaryTopic* topic =
+            connection->getConnectionData()->
+                getConnector()->createTemporaryTopic( sessionInfo );
+
+        // Check if this object is closeable, if so we add it to our map
+        // of closeable resources so that it gets cleaned up.
+        if( dynamic_cast<cms::Closeable*>( topic ) != NULL ) {
+            // Add the consumer to the map of closeable session resources.
+            synchronized( &closableSessionResources ) {
+                closableSessionResources.add(
+                    dynamic_cast<cms::Closeable*>( topic ) );
+            }
+        }
+
+        return topic;
     }
     AMQ_CATCH_RETHROW( ActiveMQException )
     AMQ_CATCHALL_THROW( ActiveMQException )
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-cms::Message* ActiveMQSession::createMessage() 
+cms::Message* ActiveMQSession::createMessage()
     throw ( cms::CMSException )
 {
     try
@@ -422,7 +450,7 @@
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-cms::BytesMessage* ActiveMQSession::createBytesMessage() 
+cms::BytesMessage* ActiveMQSession::createBytesMessage()
     throw ( cms::CMSException )
 {
     try
@@ -444,7 +472,7 @@
 ////////////////////////////////////////////////////////////////////////////////
 cms::BytesMessage* ActiveMQSession::createBytesMessage(
     const unsigned char* bytes,
-    std::size_t bytesSize ) 
+    std::size_t bytesSize )
         throw ( cms::CMSException )
 {
     try
@@ -460,7 +488,7 @@
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-cms::TextMessage* ActiveMQSession::createTextMessage() 
+cms::TextMessage* ActiveMQSession::createTextMessage()
     throw ( cms::CMSException )
 {
     try
@@ -480,7 +508,7 @@
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-cms::TextMessage* ActiveMQSession::createTextMessage( const std::string& text ) 
+cms::TextMessage* ActiveMQSession::createTextMessage( const std::string& text )
     throw ( cms::CMSException )
 {
     try
@@ -496,7 +524,7 @@
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-cms::MapMessage* ActiveMQSession::createMapMessage() 
+cms::MapMessage* ActiveMQSession::createMapMessage()
     throw ( cms::CMSException )
 {
     try
@@ -519,14 +547,14 @@
 ////////////////////////////////////////////////////////////////////////////////
 cms::Session::AcknowledgeMode ActiveMQSession::getAcknowledgeMode() const
 {
-    return sessionInfo != NULL ? 
+    return sessionInfo != NULL ?
         sessionInfo->getAckMode() : Session::AUTO_ACKNOWLEDGE;
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 bool ActiveMQSession::isTransacted() const
 {
-    return sessionInfo != NULL ? 
+    return sessionInfo != NULL ?
         sessionInfo->getAckMode() == Session::SESSION_TRANSACTED : false;
 }
 
@@ -547,13 +575,13 @@
         // Stores the Message and its consumer in the tranasction, if the
         // session is a transactional one.
         if( isTransacted() )
-        {      
+        {
             transaction->addToTransaction( message, consumer );
         }
 
         // Delegate to connector to ack this message.
         return connection->getConnectionData()->
-            getConnector()->acknowledge( 
+            getConnector()->acknowledge(
                 sessionInfo, dynamic_cast< cms::Message* >( message ) );
     }
     AMQ_CATCH_RETHROW( ActiveMQException )
@@ -594,7 +622,7 @@
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ActiveMQSession::onDestroySessionResource( 
+void ActiveMQSession::onDestroySessionResource(
     ActiveMQSessionResource* resource )
         throw ( cms::CMSException )
 {
@@ -607,7 +635,7 @@
                 "ActiveMQSession::onProducerClose - Session Already Closed");
         }
 
-        ActiveMQConsumer* consumer = 
+        ActiveMQConsumer* consumer =
             dynamic_cast< ActiveMQConsumer*>( resource );
 
         if( consumer != NULL )
@@ -623,14 +651,14 @@
                 transaction->removeFromTransaction( consumer );
             }
         }
-        
+
         // Remove the entry from the session resource map if it's there
         cms::Closeable* closeableResource = dynamic_cast<cms::Closeable*>(resource);
         if( closeableResource != NULL ){
             synchronized( &closableSessionResources ) {
                 closableSessionResources.remove( closeableResource );
-            }   
-        }     
+            }
+        }
 
         // Free its resources.
         connection->getConnectionData()->
@@ -655,9 +683,9 @@
 void ActiveMQSession::run()
 {
     try{
-        
+
         while( !closed )
-        {    
+        {
             std::pair<Message*, ActiveMQProducer*> messagePair;
 
             synchronized( &msgQueue )
@@ -674,21 +702,21 @@
                     }
                     msgQueue.wait();
                 }
-                
+
                 // don't want to process messages if we are shutting down.
                 if( closed )
                 {
                     return;
                 }
-                
+
                 // get the data
                 messagePair = msgQueue.pop();
             }
 
             // Dispatch the message
             connection->getConnectionData()->
-                getConnector()->send( 
-                    messagePair.first, 
+                getConnector()->send(
+                    messagePair.first,
                     messagePair.second->getProducerInfo() );
 
             // Destroy Our copy of the message
@@ -698,7 +726,7 @@
     catch(...)
     {
         cms::ExceptionListener* listener = this->getExceptionListener();
-        
+
         if( listener != NULL )
         {
             listener->onException( ActiveMQException(
@@ -706,34 +734,34 @@
                 "ActiveMQSession::run - "
                 "Connector threw an unknown Exception, recovering..." ) );
         }
-    }        
+    }
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 void ActiveMQSession::startThread() throw ( ActiveMQException ) {
-    
+
     try
     {
         // Start the thread, if it's not already started.
         if( asyncThread == NULL )
         {
-            asyncThread = new Thread( this );        
-            asyncThread->start();                        
+            asyncThread = new Thread( this );
+            asyncThread->start();
         }
-    }        
+    }
     AMQ_CATCH_RETHROW( ActiveMQException )
     AMQ_CATCHALL_THROW( ActiveMQException )
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 void ActiveMQSession::stopThread() throw ( ActiveMQException ) {
-    
+
     try
     {
         // if the thread is running signal it to quit and then
         // wait for run to return so thread can die
         if( asyncThread != NULL )
-        {                        
+        {
             synchronized( &msgQueue )
             {
                 // Force a wakeup if run is in a wait.
@@ -745,7 +773,7 @@
             delete asyncThread;
             asyncThread = NULL;
         }
-    }        
+    }
     AMQ_CATCH_RETHROW( ActiveMQException )
     AMQ_CATCHALL_THROW( ActiveMQException )
 }
@@ -760,7 +788,7 @@
             while( !msgQueue.empty() )
             {
                 // destroy these messages if this is not a transacted
-                // session, if it is then the tranasction will clean 
+                // session, if it is then the tranasction will clean
                 // the messages up.
                 delete msgQueue.pop().first;
             }

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.h?view=diff&rev=508774&r1=508773&r2=508774
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.h Sat
Feb 17 07:55:57 2007
@@ -36,8 +36,8 @@
     class ActiveMQMessage;
     class ActiveMQProducer;
     class ActiveMQConsumer;
-   
-    class ActiveMQSession : 
+
+    class ActiveMQSession :
         public cms::Session,
         public concurrent::Runnable
     {
@@ -64,7 +64,9 @@
         bool closed;
 
         /**
-         * The set of closable session resources (consumers and producers).
+         * The set of closable session resources;
+         * This can consist of consumers and producers and sometimes
+         * destination.
          */
         util::Set<cms::Closeable*> closableSessionResources;
 
@@ -81,34 +83,34 @@
         /**
          * Outgoing Message Queue
          */
-        util::Queue< std::pair<cms::Message*, ActiveMQProducer*> > msgQueue;
       
+        util::Queue< std::pair<cms::Message*, ActiveMQProducer*> > msgQueue;
 
     public:
-   
+
         ActiveMQSession( connector::SessionInfo* sessionInfo,
                          const util::Properties& properties,
                          ActiveMQConnection* connection );
-   
+
         virtual ~ActiveMQSession();
 
     public:   // Implements Mehtods
-   
+
         /**
          * Closes this session as well as any active child consumers or
          * producers.
          * @throws CMSException
          */
         virtual void close() throw ( cms::CMSException );
-      
+
         /**
-         * Commits all messages done in this transaction and releases any 
+         * Commits all messages done in this transaction and releases any
          * locks currently held.
          * @throws CMSException
          */
         virtual void commit() throw ( cms::CMSException );
 
         /**
-         * Rollsback all messages done in this transaction and releases any 
+         * Rollsback all messages done in this transaction and releases any
          * locks currently held.
          * @throws CMSException
          */
@@ -124,7 +126,7 @@
                 throw ( cms::CMSException );
 
         /**
-         * Creates a MessageConsumer for the specified destination, using a 
+         * Creates a MessageConsumer for the specified destination, using a
          * message selector.
          * @param the Destination that this consumer receiving messages for.
          * @param the Message Selector string to use for this destination
@@ -135,13 +137,13 @@
             const std::string& selector )
                 throw ( cms::CMSException );
         /**
-         * Creates a MessageConsumer for the specified destination, using a 
+         * Creates a MessageConsumer for the specified destination, using a
          * message selector.
          * @param the Destination that this consumer receiving messages for.
          * @param the Message Selector string to use for this destination
-         * @param if true, and the destination is a topic, inhibits the 
-         *        delivery of messages published by its own connection. The 
-         *        behavior for NoLocal is not specified if the destination is 
+         * @param if true, and the destination is a topic, inhibits the
+         *        delivery of messages published by its own connection. The
+         *        behavior for NoLocal is not specified if the destination is
          *        a queue.
          * @throws CMSException
          */
@@ -150,9 +152,9 @@
             const std::string& selector,
             bool noLocal )
                 throw ( cms::CMSException );
-         
+
         /**
-         * Creates a durable subscriber to the specified topic, using a 
+         * Creates a durable subscriber to the specified topic, using a
          * message selector
          * @param the topic to subscribe to
          * @param name used to identify the subscription
@@ -167,7 +169,7 @@
                 throw ( cms::CMSException );
 
         /**
-         * Creates a MessageProducer to send messages to the specified 
+         * Creates a MessageProducer to send messages to the specified
          * destination.
          * @param the Destination to publish on
          * @throws CMSException
@@ -175,7 +177,7 @@
         virtual cms::MessageProducer* createProducer(
             const cms::Destination* destination )
                 throw ( cms::CMSException );
-         
+
         /**
          * Creates a queue identity given a Queue name.
          * @param the name of the new Queue
@@ -183,7 +185,7 @@
          */
         virtual cms::Queue* createQueue( const std::string& queueName )
             throw ( cms::CMSException );
-      
+
         /**
          * Creates a topic identity given a Queue name.
          * @param the name of the new Topic
@@ -205,19 +207,19 @@
          */
         virtual cms::TemporaryTopic* createTemporaryTopic()
             throw ( cms::CMSException );
-         
+
         /**
          * Creates a new Message
          * @throws CMSException
          */
-        virtual cms::Message* createMessage() 
+        virtual cms::Message* createMessage()
             throw ( cms::CMSException );
 
         /**
          * Creates a BytesMessage
          * @throws CMSException
          */
-        virtual cms::BytesMessage* createBytesMessage() 
+        virtual cms::BytesMessage* createBytesMessage()
             throw ( cms::CMSException );
 
         /**
@@ -226,31 +228,31 @@
          * @param the size of the bytes array, or number of bytes to use
          * @throws CMSException
          */
-        virtual cms::BytesMessage* createBytesMessage( 
+        virtual cms::BytesMessage* createBytesMessage(
             const unsigned char* bytes,
-            std::size_t bytesSize ) 
+            std::size_t bytesSize )
                 throw ( cms::CMSException );
 
         /**
          * Creates a new TextMessage
          * @throws CMSException
          */
-        virtual cms::TextMessage* createTextMessage() 
+        virtual cms::TextMessage* createTextMessage()
             throw ( cms::CMSException );
-      
+
         /**
          * Creates a new TextMessage and set the text to the value given
          * @param the initial text for the message
          * @throws CMSException
          */
-        virtual cms::TextMessage* createTextMessage( const std::string& text ) 
+        virtual cms::TextMessage* createTextMessage( const std::string& text )
             throw ( cms::CMSException );
 
         /**
          * Creates a new TextMessage
          * @throws CMSException
          */
-        virtual cms::MapMessage* createMapMessage() 
+        virtual cms::MapMessage* createMapMessage()
             throw ( cms::CMSException );
 
         /**
@@ -258,15 +260,15 @@
          * @return the Sessions Acknowledge Mode
          */
         virtual cms::Session::AcknowledgeMode getAcknowledgeMode() const;
-      
+
         /**
          * Gets if the Sessions is a Transacted Session
          * @return transacted true - false.
          */
         virtual bool isTransacted() const;
-          
+
    public:   // ActiveMQSession specific Methods
-   
+
         /**
          * Sends a message from the Producer specified
          * @param cms::Message pointer
@@ -275,11 +277,11 @@
          */
         virtual void send( cms::Message* message, ActiveMQProducer* producer )
             throw ( cms::CMSException );
-         
+
         /**
-         * When a ActiveMQ core object is closed or destroyed it should call 
-         * back and let the session know that it is going away, this allows 
-         * the session to clean up any associated resources.  This method 
+         * When a ActiveMQ core object is closed or destroyed it should call
+         * back and let the session know that it is going away, this allows
+         * the session to clean up any associated resources.  This method
          * destroy's the data that is associated with a Producer object
          * @param The Producer that is being destoryed
          * @throw CMSException
@@ -288,7 +290,7 @@
             throw ( cms::CMSException );
 
         /**
-         * Called to acknowledge the receipt of a message.  
+         * Called to acknowledge the receipt of a message.
          * @param The consumer that received the message
          * @param The Message to acknowledge.
          * @throws CMSException
@@ -296,7 +298,7 @@
         virtual void acknowledge( ActiveMQConsumer* consumer,
                                   ActiveMQMessage* message )
             throw ( cms::CMSException );
-         
+
         /**
          * This method gets any registered exception listener of this sessions
          * connection and returns it.  Mainly intended for use by the objects
@@ -322,9 +324,9 @@
          * is registered with a Thread and started.  This function reads from
          * the outgoing message queue and dispatches calls to the connector that
          * is registered with this class.
-         */            
+         */
         virtual void run();
-        
+
         /**
          * Starts the message processing thread to receive messages
          * asynchronously.  This thread is started when setMessageListener
@@ -332,7 +334,7 @@
          * consumer asynchronously instead of synchronously (receive).
          */
         void startThread() throw ( exceptions::ActiveMQException );
-        
+
         /**
          * Stops the asynchronous message processing thread if it's started.
          */



Mime
View raw message