activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r1305601 [1/5] - in /activemq/activemq-cpp/trunk/activemq-cpp/src: main/ main/activemq/cmsutil/ main/activemq/core/ main/activemq/core/kernels/ main/cms/ test/
Date Mon, 26 Mar 2012 21:11:14 GMT
Author: tabish
Date: Mon Mar 26 21:11:12 2012
New Revision: 1305601

URL: http://svn.apache.org/viewvc?rev=1305601&view=rev
Log:
Break out the core of producers and consumer to objects that can be referenced by session
and the proxy producer and consumer so that deletes of the proxy don't destroy the real instances
held in session.  Allows for more robust threading and simpler management of object lifetime.

Added:
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp
  (with props)
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.h
  (with props)
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQProducerKernel.cpp
  (with props)
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQProducerKernel.h
  (with props)
Modified:
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/Makefile.am
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/cmsutil/ResourceLifecycleManager.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQProducer.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQProducer.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQQueueBrowser.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQQueueBrowser.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSessionExecutor.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/cms/Closeable.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/cms/Closeable.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/cms/Connection.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/cms/Connection.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/cms/MessageProducer.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/cms/MessageProducer.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/cms/Startable.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/cms/Startable.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/test/testRegistry.cpp

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/Makefile.am
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/Makefile.am?rev=1305601&r1=1305600&r2=1305601&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/Makefile.am (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/Makefile.am Mon Mar 26 21:11:12 2012
@@ -101,6 +101,8 @@ cc_sources = \
     activemq/core/PrefetchPolicy.cpp \
     activemq/core/RedeliveryPolicy.cpp \
     activemq/core/SimplePriorityMessageDispatchChannel.cpp \
+    activemq/core/kernels/ActiveMQConsumerKernel.cpp \
+    activemq/core/kernels/ActiveMQProducerKernel.cpp \
     activemq/core/policies/DefaultPrefetchPolicy.cpp \
     activemq/core/policies/DefaultRedeliveryPolicy.cpp \
     activemq/exceptions/ActiveMQException.cpp \
@@ -596,6 +598,8 @@ h_sources = \
     activemq/core/RedeliveryPolicy.h \
     activemq/core/SimplePriorityMessageDispatchChannel.h \
     activemq/core/Synchronization.h \
+    activemq/core/kernels/ActiveMQConsumerKernel.h \
+    activemq/core/kernels/ActiveMQProducerKernel.h \
     activemq/core/policies/DefaultPrefetchPolicy.h \
     activemq/core/policies/DefaultRedeliveryPolicy.h \
     activemq/exceptions/ActiveMQException.h \

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/cmsutil/ResourceLifecycleManager.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/cmsutil/ResourceLifecycleManager.cpp?rev=1305601&r1=1305600&r2=1305601&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/cmsutil/ResourceLifecycleManager.cpp
(original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/cmsutil/ResourceLifecycleManager.cpp
Mon Mar 26 21:11:12 2012
@@ -71,73 +71,76 @@ void ResourceLifecycleManager::releaseAl
 void ResourceLifecycleManager::destroy() {
 
     try{
-        // Close all the connections.
-        std::auto_ptr< decaf::util::Iterator< cms::Connection* > > connIter(
-            connections.iterator() );
 
-        while( connIter->hasNext() ) {
-            cms::Connection* conn = connIter->next();
-            try {
-                conn->close();
-            } catch(...){}
-        }
-
-        // Destroy the producers.
-        std::auto_ptr< decaf::util::Iterator< cms::MessageProducer* > > prodIter(
-            producers.iterator() );
-
-        while( prodIter->hasNext() ) {
-            cms::MessageProducer* producer = prodIter->next();
-            try {
-                delete producer;
-            } catch( ... ) {}
-        }
-
-        // Destroy the consumers.
-        std::auto_ptr< decaf::util::Iterator< cms::MessageConsumer* > > consIter(
-            consumers.iterator() );
-
-        while( consIter->hasNext() ) {
-            cms::MessageConsumer* consumer = consIter->next();
-            try {
-                delete consumer;
-            } catch( ... ) {}
-        }
-
-        // Destroy the destinations.
-        std::auto_ptr< decaf::util::Iterator< cms::Destination* > > destIter(
-            destinations.iterator() );
+        synchronized(&connections) {
+            // Close all the connections.
+            std::auto_ptr< decaf::util::Iterator< cms::Connection* > > connIter(
+                connections.iterator() );
+
+            while( connIter->hasNext() ) {
+                cms::Connection* conn = connIter->next();
+                try {
+                    conn->close();
+                } catch(...){}
+            }
+
+            // Destroy the producers.
+            std::auto_ptr< decaf::util::Iterator< cms::MessageProducer* > > prodIter(
+                producers.iterator() );
+
+            while( prodIter->hasNext() ) {
+                cms::MessageProducer* producer = prodIter->next();
+                try {
+                    delete producer;
+                } catch( ... ) {}
+            }
+
+            // Destroy the consumers.
+            std::auto_ptr< decaf::util::Iterator< cms::MessageConsumer* > > consIter(
+                consumers.iterator() );
+
+            while( consIter->hasNext() ) {
+                cms::MessageConsumer* consumer = consIter->next();
+                try {
+                    delete consumer;
+                } catch( ... ) {}
+            }
+
+            // Destroy the destinations.
+            std::auto_ptr< decaf::util::Iterator< cms::Destination* > > destIter(
+                destinations.iterator() );
+
+            while( destIter->hasNext() ) {
+                cms::Destination* dest = destIter->next();
+                try {
+                    delete dest;
+                } catch( ... ) {}
+            }
+
+            // Destroy the sessions.
+            std::auto_ptr< decaf::util::Iterator< cms::Session* > > sessIter(
+                sessions.iterator() );
+
+            while( sessIter->hasNext() ) {
+                cms::Session* session = sessIter->next();
+                try {
+                    delete session;
+                } catch( ... ) {}
+            }
+
+            // Destroy the connections,
+            connIter.reset( connections.iterator() );
+
+            while( connIter->hasNext() ) {
+                cms::Connection* conn = connIter->next();
+                try {
+                    delete conn;
+                } catch( ... ) {}
+            }
 
-        while( destIter->hasNext() ) {
-            cms::Destination* dest = destIter->next();
-            try {
-                delete dest;
-            } catch( ... ) {}
+            // Empty all the lists.
+            releaseAll();
         }
-
-        // Destroy the sessions.
-        std::auto_ptr< decaf::util::Iterator< cms::Session* > > sessIter(
-            sessions.iterator() );
-
-        while( sessIter->hasNext() ) {
-            cms::Session* session = sessIter->next();
-            try {
-                delete session;
-            } catch( ... ) {}
-        }
-
-        // Destroy the connections,
-        connIter.reset( connections.iterator() );
-
-        while( connIter->hasNext() ) {
-            cms::Connection* conn = connIter->next();
-            try {
-                delete conn;
-            } catch( ... ) {}
-        }
-
-        // Empty all the lists.
-        releaseAll();
     }
     CMSTEMPLATE_CATCHALL()
 }

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp?rev=1305601&r1=1305600&r2=1305601&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp
(original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp
Mon Mar 26 21:11:12 2012
@@ -64,6 +64,7 @@ using namespace std;
 using namespace cms;
 using namespace activemq;
 using namespace activemq::core;
+using namespace activemq::core::kernels;
 using namespace activemq::core::policies;
 using namespace activemq::commands;
 using namespace activemq::exceptions;
@@ -84,8 +85,8 @@ namespace core{
     class ConnectionConfig {
     private:
 
-        ConnectionConfig( const ConnectionConfig& );
-        ConnectionConfig& operator= ( const ConnectionConfig& );
+        ConnectionConfig(const ConnectionConfig&);
+        ConnectionConfig& operator=(const ConnectionConfig&);
 
     public:
 
@@ -94,7 +95,7 @@ namespace core{
                                      commands::ConsumerId::COMPARATOR > DispatcherMap;
 
         typedef decaf::util::StlMap< Pointer<commands::ProducerId>,
-                                     ActiveMQProducer*,
+                                     Pointer<ActiveMQProducerKernel>,
                                      commands::ProducerId::COMPARATOR > ProducerMap;
 
     public:
@@ -184,17 +185,17 @@ namespace core{
                              activeSessions(),
                              transportListeners() {
 
-            this->defaultPrefetchPolicy.reset( new DefaultPrefetchPolicy() );
-            this->defaultRedeliveryPolicy.reset( new DefaultRedeliveryPolicy() );
-            this->clientIdGenerator.reset(new util::IdGenerator );
-            this->connectionInfo.reset( new ConnectionInfo() );
-            this->brokerInfoReceived.reset( new CountDownLatch(1) );
+            this->defaultPrefetchPolicy.reset(new DefaultPrefetchPolicy());
+            this->defaultRedeliveryPolicy.reset(new DefaultRedeliveryPolicy());
+            this->clientIdGenerator.reset(new util::IdGenerator);
+            this->connectionInfo.reset(new ConnectionInfo());
+            this->brokerInfoReceived.reset(new CountDownLatch(1));
 
             // Generate a connectionId
             std::string uniqueId = CONNECTION_ID_GENERATOR.generateId();
-            decaf::lang::Pointer<ConnectionId> connectionId( new ConnectionId() );
+            decaf::lang::Pointer<ConnectionId> connectionId(new ConnectionId());
             connectionId->setValue(uniqueId);
-            this->connectionInfo->setConnectionId( connectionId );
+            this->connectionInfo->setConnectionId(connectionId);
             this->scheduler.reset(new Scheduler(std::string("ActiveMQConnection[")+uniqueId+"]
Scheduler"));
             this->scheduler->start();
         }
@@ -209,23 +210,23 @@ namespace core{
 }}
 
 ////////////////////////////////////////////////////////////////////////////////
-ActiveMQConnection::ActiveMQConnection( const Pointer<transport::Transport>& transport,
-                                        const Pointer<decaf::util::Properties>&
properties ) :
-    config( NULL ),
-    connectionMetaData( new ActiveMQConnectionMetaData() ),
+ActiveMQConnection::ActiveMQConnection(const Pointer<transport::Transport>& transport,
+                                       const Pointer<decaf::util::Properties>&
properties) :
+    config(NULL),
+    connectionMetaData(new ActiveMQConnectionMetaData()),
     started(false),
     closed(false),
     closing(false),
     transportFailed(false) {
 
-    Pointer<ConnectionConfig> configuration( new ConnectionConfig );
+    Pointer<ConnectionConfig> configuration(new ConnectionConfig);
 
     // Register for messages and exceptions from the connector.
-    transport->setTransportListener( this );
+    transport->setTransportListener(this);
 
     // Set the initial state of the ConnectionInfo
-    configuration->connectionInfo->setManageable( false );
-    configuration->connectionInfo->setFaultTolerant( transport->isFaultTolerant()
);
+    configuration->connectionInfo->setManageable(false);
+    configuration->connectionInfo->setFaultTolerant(transport->isFaultTolerant());
 
     // Store of the transport and properties, the Connection now owns them.
     configuration->properties = properties;
@@ -235,7 +236,7 @@ ActiveMQConnection::ActiveMQConnection( 
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-ActiveMQConnection::~ActiveMQConnection() throw() {
+ActiveMQConnection::~ActiveMQConnection() {
     try {
 
         try{
@@ -255,8 +256,8 @@ void ActiveMQConnection::addDispatcher(
 
     try{
         // Add the consumer to the map.
-        synchronized( &this->config->dispatchers ) {
-            this->config->dispatchers.put( consumer, dispatcher );
+        synchronized(&this->config->dispatchers) {
+            this->config->dispatchers.put(consumer, dispatcher);
         }
     }
     AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
@@ -267,8 +268,8 @@ void ActiveMQConnection::removeDispatche
 
     try{
         // Remove the consumer from the map.
-        synchronized( &this->config->dispatchers ) {
-            this->config->dispatchers.remove( consumer );
+        synchronized(&this->config->dispatchers) {
+            this->config->dispatchers.remove(consumer);
         }
     }
     AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
@@ -277,13 +278,13 @@ void ActiveMQConnection::removeDispatche
 ////////////////////////////////////////////////////////////////////////////////
 cms::Session* ActiveMQConnection::createSession() {
     try {
-        return createSession( Session::AUTO_ACKNOWLEDGE );
+        return createSession(Session::AUTO_ACKNOWLEDGE);
     }
     AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-cms::Session* ActiveMQConnection::createSession( cms::Session::AcknowledgeMode ackMode )
{
+cms::Session* ActiveMQConnection::createSession(cms::Session::AcknowledgeMode ackMode) {
 
     try {
 
@@ -292,7 +293,7 @@ cms::Session* ActiveMQConnection::create
 
         // Create the session instance.
         ActiveMQSession* session = new ActiveMQSession(
-            this, getNextSessionId(), ackMode, *this->config->properties );
+            this, getNextSessionId(), ackMode, *this->config->properties);
 
         return session;
     }
@@ -302,9 +303,9 @@ cms::Session* ActiveMQConnection::create
 ////////////////////////////////////////////////////////////////////////////////
 Pointer<SessionId> ActiveMQConnection::getNextSessionId() {
 
-    decaf::lang::Pointer<SessionId> sessionId( new SessionId() );
-    sessionId->setConnectionId( this->config->connectionInfo->getConnectionId()->getValue()
);
-    sessionId->setValue( this->config->sessionIds.getNextSequenceId() );
+    decaf::lang::Pointer<SessionId> sessionId(new SessionId());
+    sessionId->setConnectionId(this->config->connectionInfo->getConnectionId()->getValue());
+    sessionId->setValue(this->config->sessionIds.getNextSequenceId());
 
     return sessionId;
 }
@@ -315,43 +316,40 @@ void ActiveMQConnection::addSession( Act
     try {
 
         // Remove this session from the set of active sessions.
-        this->config->activeSessions.add( session );
+        this->config->activeSessions.add(session);
     }
     AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConnection::removeSession( ActiveMQSession* session ) {
+void ActiveMQConnection::removeSession(ActiveMQSession* session) {
 
     try {
-
         // Remove this session from the set of active sessions.
-        this->config->activeSessions.remove( session );
+        this->config->activeSessions.remove(session);
     }
     AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConnection::addProducer( ActiveMQProducer* producer ) {
+void ActiveMQConnection::addProducer(Pointer<ActiveMQProducerKernel> producer) {
 
     try {
-
         // Add this producer from the set of active consumer.
-        synchronized( &this->config->activeProducers ) {
-            this->config->activeProducers.put( producer->getProducerInfo()->getProducerId(),
producer );
+        synchronized(&this->config->activeProducers) {
+            this->config->activeProducers.put(producer->getProducerInfo()->getProducerId(),
producer);
         }
     }
     AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConnection::removeProducer( const decaf::lang::Pointer<ProducerId>&
producerId ) {
+void ActiveMQConnection::removeProducer(const decaf::lang::Pointer<ProducerId>&
producerId) {
 
     try {
-
         // Remove this producer from the set of active consumer.
-        synchronized( &this->config->activeProducers ) {
-            this->config->activeProducers.remove( producerId );
+        synchronized(&this->config->activeProducers) {
+            this->config->activeProducers.remove(producerId);
         }
     }
     AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
@@ -360,7 +358,7 @@ void ActiveMQConnection::removeProducer(
 ////////////////////////////////////////////////////////////////////////////////
 std::string ActiveMQConnection::getClientID() const {
 
-    if( this->isClosed() ) {
+    if (this->isClosed()) {
         return "";
     }
 
@@ -370,30 +368,30 @@ std::string ActiveMQConnection::getClien
 ////////////////////////////////////////////////////////////////////////////////
 void ActiveMQConnection::setClientID( const std::string& clientID ) {
 
-    if( this->closed.get() ) {
-        throw cms::IllegalStateException( "Connection is already closed", NULL );
+    if (this->closed.get()) {
+        throw cms::IllegalStateException("Connection is already closed", NULL);
     }
 
-    if( this->config->clientIDSet ) {
-        throw cms::IllegalStateException( "Client ID is already set", NULL );
+    if (this->config->clientIDSet) {
+        throw cms::IllegalStateException("Client ID is already set", NULL);
     }
 
-    if( this->config->isConnectionInfoSentToBroker ) {
-        throw cms::IllegalStateException( "Cannot set client Id on a Connection already in
use.", NULL );
+    if (this->config->isConnectionInfoSentToBroker) {
+        throw cms::IllegalStateException("Cannot set client Id on a Connection already in
use.", NULL);
     }
 
-    if( clientID.empty() ) {
-        throw cms::InvalidClientIdException( "Client ID cannot be an empty string", NULL
);
+    if (clientID.empty()) {
+        throw cms::InvalidClientIdException("Client ID cannot be an empty string", NULL);
     }
 
-    this->config->connectionInfo->setClientId( clientID );
+    this->config->connectionInfo->setClientId(clientID);
     this->config->userSpecifiedClientID = true;
     ensureConnectionInfoSent();
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConnection::setDefaultClientId( const std::string& clientId ) {
-    this->setClientID( clientId );
+void ActiveMQConnection::setDefaultClientId(const std::string& clientId) {
+    this->setClientID(clientId);
     this->config->userSpecifiedClientID = true;
 }
 
@@ -402,12 +400,12 @@ void ActiveMQConnection::close() {
 
     try {
 
-        if( this->isClosed() ) {
+        if (this->isClosed()) {
             return;
         }
 
         // If we are running lets stop first.
-        if( !this->transportFailed.get() ) {
+        if (!this->transportFailed.get()) {
             this->stop();
         }
 
@@ -415,7 +413,7 @@ void ActiveMQConnection::close() {
         // passed on from the transport as it goes down.
         this->closing.set( true );
 
-        if(this->config->scheduler != NULL) {
+        if (this->config->scheduler != NULL) {
             try {
                 this->config->scheduler->stop();
             }
@@ -423,30 +421,29 @@ void ActiveMQConnection::close() {
         }
 
         // Get the complete list of active sessions.
-        std::auto_ptr< Iterator<ActiveMQSession*> > iter( this->config->activeSessions.iterator()
);
+        std::auto_ptr< Iterator<ActiveMQSession*> > iter(this->config->activeSessions.iterator());
 
         long long lastDeliveredSequenceId = 0;
 
         // Dispose of all the Session resources we know are still open.
-        while( iter->hasNext() ) {
+        while (iter->hasNext()) {
             ActiveMQSession* session = iter->next();
             try{
                 session->dispose();
-
                 lastDeliveredSequenceId =
-                    Math::max( lastDeliveredSequenceId, session->getLastDeliveredSequenceId()
);
+                    Math::max(lastDeliveredSequenceId, session->getLastDeliveredSequenceId());
             } catch( cms::CMSException& ex ){
                 /* Absorb */
             }
         }
 
         // Now inform the Broker we are shutting down.
-        this->disconnect( lastDeliveredSequenceId );
+        this->disconnect(lastDeliveredSequenceId);
 
         // Once current deliveries are done this stops the delivery
         // of any new messages.
-        this->started.set( false );
-        this->closed.set( true );
+        this->started.set(false);
+        this->closed.set(true);
     }
     AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
 }
@@ -720,11 +717,11 @@ void ActiveMQConnection::onCommand( cons
             ProducerAck* producerAck = dynamic_cast<ProducerAck*>( command.get() );
 
             // Get the consumer info object for this consumer.
-            ActiveMQProducer* producer = NULL;
+            Pointer<ActiveMQProducerKernel> producer;
             synchronized( &this->config->activeProducers ) {
-                producer = this->config->activeProducers.get( producerAck->getProducerId()
);
-                if( producer != NULL ){
-                    producer->onProducerAck( *producerAck );
+                producer = this->config->activeProducers.get(producerAck->getProducerId());
+                if (producer != NULL) {
+                    producer->onProducerAck(*producerAck);
                 }
             }
 
@@ -747,7 +744,6 @@ void ActiveMQConnection::onCommand( cons
             } catch( ... ) { /* do nothing */ }
 
         } else {
-            //LOGDECAF_WARN( logger, "Received an unknown command" );
         }
 
         Pointer< Iterator<TransportListener*> > iter( this->config->transportListeners.iterator()
);
@@ -974,7 +970,7 @@ const ConnectionInfo& ActiveMQConnection
 ////////////////////////////////////////////////////////////////////////////////
 const ConnectionId& ActiveMQConnection::getConnectionId() const {
     checkClosed();
-    return *( this->config->connectionInfo->getConnectionId() );
+    return *(this->config->connectionInfo->getConnectionId());
 }
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -1040,10 +1036,6 @@ void ActiveMQConnection::signalInterrupt
         if( failoverTransport != NULL ) {
             failoverTransport->setConnectionInterruptProcessingComplete(
                 this->config->connectionInfo->getConnectionId() );
-
-            //if( LOG.isDebugEnabled() ) {
-            //    LOG.debug("transportInterruptionProcessingComplete for: " + this.getConnectionInfo().getConnectionId());
-            //}
         }
     }
 }

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h?rev=1305601&r1=1305600&r2=1305601&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h Mon
Mar 26 21:11:12 2012
@@ -30,6 +30,7 @@
 #include <activemq/transport/Transport.h>
 #include <activemq/transport/TransportListener.h>
 #include <activemq/threads/Scheduler.h>
+#include <activemq/core/kernels/ActiveMQProducerKernel.h>
 #include <decaf/util/Properties.h>
 #include <decaf/util/concurrent/atomic/AtomicBoolean.h>
 #include <decaf/util/concurrent/CopyOnWriteArrayList.h>
@@ -47,7 +48,6 @@ namespace core{
     using decaf::util::concurrent::atomic::AtomicBoolean;
 
     class ActiveMQSession;
-    class ActiveMQProducer;
     class ConnectionConfig;
     class PrefetchPolicy;
     class RedeliveryPolicy;
@@ -93,8 +93,8 @@ namespace core{
 
     private:
 
-        ActiveMQConnection( const ActiveMQConnection& );
-        ActiveMQConnection& operator= ( const ActiveMQConnection& );
+        ActiveMQConnection(const ActiveMQConnection&);
+        ActiveMQConnection& operator=(const ActiveMQConnection&);
 
     public:
 
@@ -106,10 +106,10 @@ namespace core{
          * @param properties
          *        The Properties that were defined for this connection
          */
-        ActiveMQConnection( const Pointer<transport::Transport>& transport,
-                            const Pointer<decaf::util::Properties>& properties
);
+        ActiveMQConnection(const Pointer<transport::Transport>& transport,
+                           const Pointer<decaf::util::Properties>& properties);
 
-        virtual ~ActiveMQConnection() throw();
+        virtual ~ActiveMQConnection();
 
         /**
          * Adds the session resources for the given session instance.
@@ -119,7 +119,7 @@ namespace core{
          *
          * @throws CMSException if an error occurs while removing performing the operation.
          */
-        virtual void addSession( ActiveMQSession* session );
+        virtual void addSession(ActiveMQSession* session);
 
         /**
          * Removes the session resources for the given session instance.
@@ -129,7 +129,7 @@ namespace core{
          *
          * @throws CMSException if an error occurs while removing performing the operation.
          */
-        virtual void removeSession( ActiveMQSession* session );
+        virtual void removeSession(ActiveMQSession* session);
 
         /**
          * Adds an active Producer to the Set of known producers.
@@ -139,14 +139,14 @@ namespace core{
          *
          * @throws CMSException if an error occurs while removing performing the operation.
          */
-        virtual void addProducer( ActiveMQProducer* producer );
+        virtual void addProducer(Pointer<kernels::ActiveMQProducerKernel> producer);
 
         /**
          * Removes an active Producer to the Set of known producers.
          * @param producerId - The ProducerId to remove from the the known set.
          * @throws CMSException if an error occurs while removing performing the operation.
          */
-        virtual void removeProducer( const Pointer<commands::ProducerId>& producerId
);
+        virtual void removeProducer(const Pointer<commands::ProducerId>& producerId);
 
         /**
          * Adds a dispatcher for a consumer.
@@ -154,14 +154,14 @@ namespace core{
          * @param dispatcher - The dispatcher to handle incoming messages for the consumer.
          * @throws CMSException if an error occurs while removing performing the operation.
          */
-        virtual void addDispatcher( const Pointer<commands::ConsumerId>& consumer,
Dispatcher* dispatcher );
+        virtual void addDispatcher(const Pointer<commands::ConsumerId>& consumer,
Dispatcher* dispatcher);
 
         /**
          * Removes the dispatcher for a consumer.
          * @param consumer - The consumer for which to remove the dispatcher.
          * @throws CMSException if an error occurs while removing performing the operation.
          */
-        virtual void removeDispatcher( const Pointer<commands::ConsumerId>& consumer
);
+        virtual void removeDispatcher(const Pointer<commands::ConsumerId>& consumer);
 
         /**
          * If supported sends a message pull request to the service provider asking
@@ -173,7 +173,7 @@ namespace core{
          *
          * @throws ActiveMQException if an error occurs while removing performing the operation.
          */
-        virtual void sendPullRequest( const commands::ConsumerInfo* consumer, long long timeout
);
+        virtual void sendPullRequest(const commands::ConsumerInfo* consumer, long long timeout);
 
         /**
          * Checks if this connection has been closed
@@ -217,7 +217,7 @@ namespace core{
          * @throws ActiveMQException
          *         If any other error occurs during the attempt to destroy the destination.
          */
-        virtual void destroyDestination( const commands::ActiveMQDestination* destination
);
+        virtual void destroyDestination(const commands::ActiveMQDestination* destination);
 
         /**
          * Requests that the Broker removes the given Destination.  Calling this
@@ -237,7 +237,7 @@ namespace core{
          * @throws ActiveMQException
          *         If any other error occurs during the attempt to destroy the destination.
          */
-        virtual void destroyDestination( const cms::Destination* destination );
+        virtual void destroyDestination(const cms::Destination* destination);
 
     public:   // Connection Interface Methods
 
@@ -261,12 +261,12 @@ namespace core{
         /**
          * {@inheritDoc}
          */
-        virtual void setClientID( const std::string& clientID );
+        virtual void setClientID(const std::string& clientID);
 
         /**
          * {@inheritDoc}
          */
-        virtual cms::Session* createSession( cms::Session::AcknowledgeMode ackMode );
+        virtual cms::Session* createSession(cms::Session::AcknowledgeMode ackMode);
 
         /**
          * {@inheritDoc}
@@ -291,7 +291,7 @@ namespace core{
         /**
          * {@inheritDoc}
          */
-        virtual void setExceptionListener( cms::ExceptionListener* listener );
+        virtual void setExceptionListener(cms::ExceptionListener* listener);
 
     public:   // Configuration Options
 
@@ -299,7 +299,7 @@ namespace core{
          * Sets the username that should be used when creating a new connection
          * @param username string
          */
-        void setUsername( const std::string& username );
+        void setUsername(const std::string& username);
 
         /**
          * Gets the username that this factory will use when creating a new
@@ -312,7 +312,7 @@ namespace core{
          * Sets the password that should be used when creating a new connection
          * @param password string
          */
-        void setPassword( const std::string& password );
+        void setPassword(const std::string& password);
 
         /**
          * Gets the password that this factory will use when creating a new
@@ -325,14 +325,14 @@ namespace core{
          * Sets the Client Id.
          * @param clientId - The new clientId value.
          */
-        void setDefaultClientId( const std::string& clientId );
+        void setDefaultClientId(const std::string& clientId);
 
         /**
          * Sets the Broker URL that should be used when creating a new
          * connection instance
          * @param brokerURL string
          */
-        void setBrokerURL( const std::string& brokerURL );
+        void setBrokerURL(const std::string& brokerURL);
 
         /**
          * Gets the Broker URL that this factory will use when creating a new
@@ -349,7 +349,7 @@ namespace core{
          * @param policy
          *      The new PrefetchPolicy that the ConnectionFactory should clone for Connections.
          */
-        void setPrefetchPolicy( PrefetchPolicy* policy );
+        void setPrefetchPolicy(PrefetchPolicy* policy);
 
         /**
          * Gets the pointer to the current PrefetchPolicy that is in use by this ConnectionFactory.
@@ -366,7 +366,7 @@ namespace core{
          * @param policy
          *      The new RedeliveryPolicy that the ConnectionFactory should clone for Connections.
          */
-        void setRedeliveryPolicy( RedeliveryPolicy* policy );
+        void setRedeliveryPolicy(RedeliveryPolicy* policy);
 
         /**
          * Gets the pointer to the current RedeliveryPolicy that is in use by this ConnectionFactory.
@@ -388,7 +388,7 @@ namespace core{
          * @param value
          *        The value of the dispatch asynchronously option sent to the broker.
          */
-        void setDispatchAsync( bool value );
+        void setDispatchAsync(bool value);
 
         /**
          * Gets if the Connection should always send things Synchronously.
@@ -402,7 +402,7 @@ namespace core{
          * @param value
          *        true if sends should always be Synchronous.
          */
-        void setAlwaysSyncSend( bool value );
+        void setAlwaysSyncSend(bool value);
 
         /**
          * Gets if the useAsyncSend option is set
@@ -414,7 +414,7 @@ namespace core{
          * Sets the useAsyncSend option
          * @param value - true to activate, false to disable.
          */
-        void setUseAsyncSend( bool value );
+        void setUseAsyncSend(bool value);
 
         /**
          * Gets if the Connection is configured for Message body compression.
@@ -428,7 +428,7 @@ namespace core{
          * @param value
          *      Boolean indicating if Message body compression is enabled.
          */
-        void setUseCompression( bool value );
+        void setUseCompression(bool value);
 
         /**
          * Sets the Compression level used when Message body compression is enabled, a
@@ -439,7 +439,7 @@ namespace core{
          * @param value
          *      A signed int value that controls the compression level.
          */
-        void setCompressionLevel( int value );
+        void setCompressionLevel(int value);
 
         /**
          * Gets the currently configured Compression level for Message bodies.
@@ -459,7 +459,7 @@ namespace core{
          * cause all messages to be sent using a Synchronous request is non-zero.
          * @param timeout - The time to wait for a response.
          */
-        void setSendTimeout( unsigned int timeout );
+        void setSendTimeout(unsigned int timeout);
 
         /**
          * Gets the assigned close timeout for this Connector
@@ -471,7 +471,7 @@ namespace core{
          * Sets the close timeout to use when sending the disconnect request.
          * @param timeout - The time to wait for a close message.
          */
-        void setCloseTimeout( unsigned int timeout );
+        void setCloseTimeout(unsigned int timeout);
 
         /**
          * Gets the configured producer window size for Producers that are created
@@ -488,7 +488,7 @@ namespace core{
          * message to be sent.
          * @param windowSize - The size in bytes of the Producers memory window.
          */
-        void setProducerWindowSize( unsigned int windowSize );
+        void setProducerWindowSize(unsigned int windowSize);
 
         /**
          * @returns true if the Connections that this factory creates should support the
@@ -503,7 +503,7 @@ namespace core{
          * @param value
          *      Boolean indicating if Message priority should be enabled.
          */
-        void setMessagePrioritySupported( bool value );
+        void setMessagePrioritySupported(bool value);
 
         /**
          * Get the Next Temporary Destination Id
@@ -530,7 +530,7 @@ namespace core{
          *      The TransportListener instance to add to this Connection's set of listeners
          *      to notify of Transport events.
          */
-        void addTransportListener( transport::TransportListener* transportListener );
+        void addTransportListener(transport::TransportListener* transportListener);
 
         /**
          * Removes a registered TransportListener from the Connection's set of Transport
@@ -540,20 +540,20 @@ namespace core{
          * @param transportListener
          *      The pointer to the TransportListener to remove from the set of listeners.
          */
-        void removeTransportListener( transport::TransportListener* transportListener );
+        void removeTransportListener(transport::TransportListener* transportListener);
 
         /**
          * Event handler for the receipt of a non-response command from the
          * transport.
          * @param command the received command object.
          */
-        virtual void onCommand( const Pointer<commands::Command>& command );
+        virtual void onCommand(const Pointer<commands::Command>& command);
 
         /**
          * Event handler for an exception from a command transport.
          * @param ex The exception.
          */
-        virtual void onException( const decaf::lang::Exception& ex );
+        virtual void onException(const decaf::lang::Exception& ex);
 
         /**
          * The transport has suffered an interruption from which it hopes to recover
@@ -621,7 +621,7 @@ namespace core{
          * @throws ActiveMQException if not currently connected, or if the operation
          *         fails for any reason.
          */
-        void oneway( Pointer<commands::Command> command );
+        void oneway(Pointer<commands::Command> command);
 
         /**
          * Sends a synchronous request and returns the response from the broker.  This
@@ -637,7 +637,7 @@ namespace core{
          * @throws BrokerException if the response from the broker is of type ExceptionResponse.
          * @throws ActiveMQException if any other error occurs while sending the Command.
          */
-        Pointer<commands::Response> syncRequest( Pointer<commands::Command> command,
unsigned int timeout = 0 );
+        Pointer<commands::Response> syncRequest(Pointer<commands::Command> command,
unsigned int timeout = 0);
 
         /**
          * Notify the exception listener
@@ -664,7 +664,7 @@ namespace core{
          * @param ex
          *      The exception that caused the error condition.
          */
-        void onAsyncException( const decaf::lang::Exception& ex );
+        void onAsyncException(const decaf::lang::Exception& ex);
 
         /**
          * Check for Closed State and Throw an exception if true.
@@ -693,7 +693,7 @@ namespace core{
         virtual Pointer<commands::SessionId> getNextSessionId();
 
         // Sends a oneway disconnect message to the broker.
-        void disconnect( long long lastDeliveredSequenceId );
+        void disconnect(long long lastDeliveredSequenceId);
 
         // Waits for all Consumers to handle the Transport Interrupted event.
         void waitForTransportInterruptionProcessingToComplete();



Mime
View raw message