activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r745701 [1/3] - in /activemq/activemq-cpp/trunk/src: main/ main/activemq/core/ main/activemq/state/ main/activemq/transport/ main/activemq/transport/correlator/ main/activemq/transport/failover/ main/activemq/transport/logging/ main/activem...
Date Thu, 19 Feb 2009 01:06:08 GMT
Author: tabish
Date: Thu Feb 19 01:06:07 2009
New Revision: 745701

URL: http://svn.apache.org/viewvc?rev=745701&view=rev
Log:
http://issues.apache.org/activemq/browse/AMQCPP-100

Refactoring of the Transports and Commands to use Smart Pointers to pass data around and avoid making copies as much as possible.  Simplified code in many of the classes as there's less lifetime management needed for the self managed commands.

Added:
    activemq/activemq-cpp/trunk/src/main/Makefile.maven
Modified:
    activemq/activemq-cpp/trunk/src/main/Makefile.am
    activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConnection.cpp
    activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConnection.h
    activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConnectionFactory.cpp
    activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConnectionSupport.cpp
    activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConnectionSupport.h
    activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConsumer.cpp
    activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConsumer.h
    activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQProducer.cpp
    activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQProducer.h
    activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQSession.cpp
    activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQSession.h
    activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQSessionExecutor.cpp
    activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQSessionExecutor.h
    activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQTransactionContext.cpp
    activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQTransactionContext.h
    activemq/activemq-cpp/trunk/src/main/activemq/state/ConnectionStateTracker.cpp
    activemq/activemq-cpp/trunk/src/main/activemq/state/ConnectionStateTracker.h
    activemq/activemq-cpp/trunk/src/main/activemq/state/Tracked.cpp
    activemq/activemq-cpp/trunk/src/main/activemq/state/Tracked.h
    activemq/activemq-cpp/trunk/src/main/activemq/transport/AbstractTransportFactory.cpp
    activemq/activemq-cpp/trunk/src/main/activemq/transport/AbstractTransportFactory.h
    activemq/activemq-cpp/trunk/src/main/activemq/transport/DefaultTransportListener.h
    activemq/activemq-cpp/trunk/src/main/activemq/transport/IOTransport.cpp
    activemq/activemq-cpp/trunk/src/main/activemq/transport/IOTransport.h
    activemq/activemq-cpp/trunk/src/main/activemq/transport/Transport.h
    activemq/activemq-cpp/trunk/src/main/activemq/transport/TransportFactory.h
    activemq/activemq-cpp/trunk/src/main/activemq/transport/TransportFilter.cpp
    activemq/activemq-cpp/trunk/src/main/activemq/transport/TransportFilter.h
    activemq/activemq-cpp/trunk/src/main/activemq/transport/TransportListener.h
    activemq/activemq-cpp/trunk/src/main/activemq/transport/correlator/FutureResponse.h
    activemq/activemq-cpp/trunk/src/main/activemq/transport/correlator/ResponseCorrelator.cpp
    activemq/activemq-cpp/trunk/src/main/activemq/transport/correlator/ResponseCorrelator.h
    activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/BackupTransport.cpp
    activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/BackupTransport.h
    activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/FailoverTransport.cpp
    activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/FailoverTransport.h
    activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/FailoverTransportFactory.cpp
    activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/FailoverTransportFactory.h
    activemq/activemq-cpp/trunk/src/main/activemq/transport/logging/LoggingTransport.cpp
    activemq/activemq-cpp/trunk/src/main/activemq/transport/logging/LoggingTransport.h
    activemq/activemq-cpp/trunk/src/main/activemq/transport/mock/MockTransport.cpp
    activemq/activemq-cpp/trunk/src/main/activemq/transport/mock/MockTransport.h
    activemq/activemq-cpp/trunk/src/main/activemq/transport/mock/MockTransportFactory.cpp
    activemq/activemq-cpp/trunk/src/main/activemq/transport/mock/MockTransportFactory.h
    activemq/activemq-cpp/trunk/src/main/activemq/transport/tcp/TcpTransport.cpp
    activemq/activemq-cpp/trunk/src/main/activemq/transport/tcp/TcpTransport.h
    activemq/activemq-cpp/trunk/src/main/activemq/transport/tcp/TcpTransportFactory.cpp
    activemq/activemq-cpp/trunk/src/main/activemq/transport/tcp/TcpTransportFactory.h
    activemq/activemq-cpp/trunk/src/main/activemq/wireformat/WireFormat.h
    activemq/activemq-cpp/trunk/src/main/activemq/wireformat/WireFormatFactory.h
    activemq/activemq-cpp/trunk/src/main/activemq/wireformat/WireFormatNegotiator.h
    activemq/activemq-cpp/trunk/src/main/activemq/wireformat/openwire/OpenWireFormat.cpp
    activemq/activemq-cpp/trunk/src/main/activemq/wireformat/openwire/OpenWireFormat.h
    activemq/activemq-cpp/trunk/src/main/activemq/wireformat/openwire/OpenWireFormatFactory.cpp
    activemq/activemq-cpp/trunk/src/main/activemq/wireformat/openwire/OpenWireFormatFactory.h
    activemq/activemq-cpp/trunk/src/main/activemq/wireformat/openwire/OpenWireFormatNegotiator.cpp
    activemq/activemq-cpp/trunk/src/main/activemq/wireformat/openwire/OpenWireFormatNegotiator.h
    activemq/activemq-cpp/trunk/src/main/activemq/wireformat/openwire/OpenWireResponseBuilder.cpp
    activemq/activemq-cpp/trunk/src/main/activemq/wireformat/openwire/OpenWireResponseBuilder.h
    activemq/activemq-cpp/trunk/src/test/activemq/core/ActiveMQConnectionTest.cpp
    activemq/activemq-cpp/trunk/src/test/activemq/core/ActiveMQSessionTest.cpp
    activemq/activemq-cpp/trunk/src/test/activemq/transport/IOTransportTest.cpp
    activemq/activemq-cpp/trunk/src/test/activemq/transport/correlator/ResponseCorrelatorTest.cpp
    activemq/activemq-cpp/trunk/src/test/activemq/transport/correlator/ResponseCorrelatorTest.h
    activemq/activemq-cpp/trunk/src/test/activemq/transport/mock/MockTransportFactoryTest.cpp

Modified: activemq/activemq-cpp/trunk/src/main/Makefile.am
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/Makefile.am?rev=745701&r1=745700&r2=745701&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/Makefile.am (original)
+++ activemq/activemq-cpp/trunk/src/main/Makefile.am Thu Feb 19 01:06:07 2009
@@ -307,6 +307,7 @@
     decaf/lang/Boolean.h \
     decaf/lang/exceptions/NoSuchElementException.h \
     decaf/lang/exceptions/RuntimeException.h \
+    decaf/lang/exceptions/ClassCastException.h \
     decaf/lang/exceptions/IndexOutOfBoundsException.h \
     decaf/lang/exceptions/NullPointerException.h \
     decaf/lang/exceptions/IllegalStateException.h \

Added: activemq/activemq-cpp/trunk/src/main/Makefile.maven
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/Makefile.maven?rev=745701&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/Makefile.maven (added)
+++ activemq/activemq-cpp/trunk/src/main/Makefile.maven Thu Feb 19 01:06:07 2009
@@ -0,0 +1,38 @@
+# ---------------------------------------------------------------------------
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+# 
+# http://www.apache.org/licenses/LICENSE-2.0
+# 
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+# ---------------------------------------------------------------------------
+
+cc_sources = \
+    ${source}
+    
+h_sources = \
+    ${headers}
+
+##
+## Compiler / Linker Info
+##
+
+lib_LTLIBRARIES= libactivemq-cpp.la
+libactivemq_cpp_la_SOURCES= $(h_sources) $(cc_sources)
+libactivemq_cpp_la_CXXFLAGS= $(AMQ_CXXFLAGS)
+libactivemq_cpp_la_LDFLAGS= -version-info $(ACTIVEMQ_LIBRARY_VERSION)
+libactivemq_cpp_la_LIBADD= $(AMQ_LIBS)
+
+##
+## Packaging Info
+##
+library_includedir=$(includedir)/$(ACTIVEMQ_LIBRARY_NAME)-$(ACTIVEMQ_VERSION)
+nobase_library_include_HEADERS = $(h_sources)

Modified: activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConnection.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConnection.cpp?rev=745701&r1=745700&r2=745701&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConnection.cpp (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConnection.cpp Thu Feb 19 01:06:07 2009
@@ -53,6 +53,7 @@
 using namespace cms;
 using namespace activemq;
 using namespace activemq::core;
+using namespace activemq::commands;
 using namespace activemq::exceptions;
 using namespace decaf;
 using namespace decaf::util;
@@ -60,14 +61,14 @@
 using namespace decaf::lang::exceptions;
 
 ////////////////////////////////////////////////////////////////////////////////
-ActiveMQConnection::ActiveMQConnection( transport::Transport* transport,
-                                        decaf::util::Properties* properties )
- :  ActiveMQConnectionSupport( transport, properties ){
-
-    this->started = false;
-    this->closed = false;
-    this->exceptionListener = NULL;
-    this->connectionMetaData.reset( new ActiveMQConnectionMetaData() );
+ActiveMQConnection::ActiveMQConnection( const Pointer<transport::Transport>& transport,
+                                        const Pointer<decaf::util::Properties>& properties ) :
+    ActiveMQConnectionSupport( transport, properties ),
+    connectionMetaData( new ActiveMQConnectionMetaData() ),
+    connectionInfo( new ConnectionInfo() ),
+    started( false ),
+    closed( false ),
+    exceptionListener( NULL ) {
 
     // Register for messages and exceptions from the connector.
     transport->setTransportListener( this );
@@ -90,7 +91,7 @@
 
 ////////////////////////////////////////////////////////////////////////////////
 void ActiveMQConnection::addDispatcher(
-    const decaf::lang::Pointer<commands::ConsumerId>& consumer, Dispatcher* dispatcher )
+    const decaf::lang::Pointer<ConsumerId>& consumer, Dispatcher* dispatcher )
         throw ( cms::CMSException ) {
 
     // Add the consumer to the map.
@@ -101,7 +102,7 @@
 
 ////////////////////////////////////////////////////////////////////////////////
 void ActiveMQConnection::removeDispatcher(
-    const decaf::lang::Pointer<commands::ConsumerId>& consumer )
+    const decaf::lang::Pointer<ConsumerId>& consumer )
         throw ( cms::CMSException ) {
 
     // Remove the consumer from the map.
@@ -129,18 +130,18 @@
         enforceConnected();
 
         // Create and initialize a new SessionInfo object
-        std::auto_ptr<commands::SessionInfo> sessionInfo( new commands::SessionInfo() );
-        decaf::lang::Pointer<commands::SessionId> sessionId( new commands::SessionId() );
-        sessionId->setConnectionId( connectionInfo.getConnectionId()->getValue() );
+        Pointer<SessionInfo> sessionInfo( new SessionInfo() );
+        decaf::lang::Pointer<SessionId> sessionId( new SessionId() );
+        sessionId->setConnectionId( connectionInfo->getConnectionId()->getValue() );
         sessionId->setValue( this->getNextSessionId() );
         sessionInfo->setSessionId( sessionId );
 
         // Send the subscription message to the broker.
-        syncRequest( sessionInfo.get() );
+        syncRequest( sessionInfo );
 
         // Create the session instance.
         ActiveMQSession* session = new ActiveMQSession(
-            sessionInfo.release(), ackMode, this->getProperties(), this );
+            sessionInfo, ackMode, this->getProperties(), this );
 
         // Add the session to the set of active sessions.
         synchronized( &activeSessions ) {
@@ -192,9 +193,8 @@
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConnection::removeProducer(
-    const decaf::lang::Pointer<commands::ProducerId>& producerId )
-        throw ( cms::CMSException ) {
+void ActiveMQConnection::removeProducer( const decaf::lang::Pointer<ProducerId>& producerId )
+    throw ( cms::CMSException ) {
 
     try {
 
@@ -309,24 +309,24 @@
         this->startupTransport();
 
         // Fill in our connection info.
-        connectionInfo.setUserName( this->getUsername() );
-        connectionInfo.setPassword( this->getPassword() );
+        connectionInfo->setUserName( this->getUsername() );
+        connectionInfo->setPassword( this->getPassword() );
 
         // Get or Create a Client Id
         string clientId = this->getClientId();
         if( clientId.length() > 0 ){
-            connectionInfo.setClientId( clientId );
+            connectionInfo->setClientId( clientId );
         } else {
-            connectionInfo.setClientId( UUID::randomUUID().toString() );
+            connectionInfo->setClientId( UUID::randomUUID().toString() );
         }
 
         // Generate a connectionId
-        decaf::lang::Pointer<commands::ConnectionId> connectionId( new commands::ConnectionId() );
+        decaf::lang::Pointer<ConnectionId> connectionId( new ConnectionId() );
         connectionId->setValue( UUID::randomUUID().toString() );
-        connectionInfo.setConnectionId( connectionId );
+        connectionInfo->setConnectionId( connectionId );
 
         // Now we ping the broker and see if we get an ack / nack
-        syncRequest( &connectionInfo );
+        syncRequest( connectionInfo );
     }
     AMQ_CATCH_RETHROW( ActiveMQException )
     AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
@@ -339,11 +339,11 @@
     try{
 
         // Remove our ConnectionId from the Broker
-        disposeOf( connectionInfo.getConnectionId(), this->getCloseTimeout() );
+        disposeOf( connectionInfo->getConnectionId(), this->getCloseTimeout() );
 
         // Send the disconnect command to the broker.
-        commands::ShutdownInfo shutdown;
-        oneway( &shutdown );
+        Pointer<ShutdownInfo> shutdown( new ShutdownInfo() );
+        oneway( shutdown );
 
         // Allow the Support class to shutdown its resources, including the Transport.
         this->shutdownTransport();
@@ -355,18 +355,18 @@
 
 ////////////////////////////////////////////////////////////////////////////////
 void ActiveMQConnection::sendPullRequest(
-    const commands::ConsumerInfo* consumer, long long timeout ) throw ( ActiveMQException ) {
+    const ConsumerInfo* consumer, long long timeout ) throw ( ActiveMQException ) {
 
     try {
 
          if( consumer->getPrefetchSize() == 0 ) {
 
-             commands::MessagePull messagePull;
-             messagePull.setConsumerId( consumer->getConsumerId() );
-             messagePull.setDestination( consumer->getDestination() );
-             messagePull.setTimeout( timeout );
+             Pointer<MessagePull> messagePull( new MessagePull() );
+             messagePull->setConsumerId( consumer->getConsumerId() );
+             messagePull->setDestination( consumer->getDestination() );
+             messagePull->setTimeout( timeout );
 
-             this->oneway( &messagePull );
+             this->oneway( messagePull );
          }
     }
     AMQ_CATCH_RETHROW( ActiveMQException )
@@ -375,7 +375,7 @@
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConnection::destroyDestination( const commands::ActiveMQDestination* destination )
+void ActiveMQConnection::destroyDestination( const ActiveMQDestination* destination )
     throw( decaf::lang::exceptions::NullPointerException,
            decaf::lang::exceptions::IllegalStateException,
            decaf::lang::exceptions::UnsupportedOperationException,
@@ -390,16 +390,14 @@
 
         enforceConnected();
 
-        commands::DestinationInfo command;
+        Pointer<DestinationInfo> command( new DestinationInfo() );
 
-        command.setConnectionId( connectionInfo.getConnectionId() );
-        command.setOperationType( ActiveMQConstants::DESTINATION_REMOVE_OPERATION );
-        command.setDestination(
-            decaf::lang::Pointer<commands::ActiveMQDestination>(
-                destination->cloneDataStructure() ) );
+        command->setConnectionId( connectionInfo->getConnectionId() );
+        command->setOperationType( ActiveMQConstants::DESTINATION_REMOVE_OPERATION );
+        command->setDestination( Pointer<ActiveMQDestination>( destination->cloneDataStructure() ) );
 
         // Send the message to the broker.
-        syncRequest( &command );
+        syncRequest( command );
     }
     AMQ_CATCH_RETHROW( NullPointerException )
     AMQ_CATCH_RETHROW( IllegalStateException )
@@ -424,8 +422,8 @@
 
         enforceConnected();
 
-        const commands::ActiveMQDestination* amqDestination =
-            dynamic_cast<const commands::ActiveMQDestination*>( destination );
+        const ActiveMQDestination* amqDestination =
+            dynamic_cast<const ActiveMQDestination*>( destination );
 
         this->destroyDestination( amqDestination );
     }
@@ -437,16 +435,14 @@
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConnection::onCommand( commands::Command* command ) {
+void ActiveMQConnection::onCommand( const Pointer<Command>& command ) {
 
     try{
 
-        std::auto_ptr<commands::Command> commandPtr( command );
-
         if( command->isMessageDispatch() ) {
 
-            commands::MessageDispatch* dispatch =
-                dynamic_cast<commands::MessageDispatch*>( command );
+            MessageDispatch* dispatch =
+                dynamic_cast<MessageDispatch*>( command.get() );
 
             // Check fo an empty Message, shouldn't ever happen but who knows.
             if( dispatch->getMessage() == NULL ) {
@@ -474,8 +470,8 @@
 
         } else if( command->isProducerAck() ) {
 
-            commands::ProducerAck* producerAck =
-                dynamic_cast<commands::ProducerAck*>( command );
+            ProducerAck* producerAck =
+                dynamic_cast<ProducerAck*>( command.get() );
 
             // Get the consumer info object for this consumer.
             ActiveMQProducer* producer = NULL;
@@ -487,11 +483,11 @@
             }
 
         } else if( command->isWireFormatInfo() ) {
-            this->brokerWireFormatInfo.reset(
-                dynamic_cast<commands::WireFormatInfo*>( commandPtr.release() ) );
+            this->brokerWireFormatInfo =
+                command.dynamicCast<WireFormatInfo, Pointer<WireFormatInfo>::CounterType>();
         } else if( command->isBrokerInfo() ) {
-            this->brokerInfo.reset(
-                dynamic_cast<commands::BrokerInfo*>( commandPtr.release() ) );
+            this->brokerInfo =
+                command.dynamicCast<BrokerInfo, Pointer<BrokerInfo>::CounterType>();
         } else if( command->isKeepAliveInfo() ) {
 
             if( command->isResponseRequired() ) {
@@ -539,7 +535,7 @@
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConnection::oneway( commands::Command* command )
+void ActiveMQConnection::oneway( Pointer<Command> command )
     throw ( ActiveMQException ) {
 
     try {
@@ -553,28 +549,28 @@
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConnection::syncRequest( commands::Command* command, unsigned int timeout )
+void ActiveMQConnection::syncRequest( Pointer<Command> command, unsigned int timeout )
     throw ( ActiveMQException ) {
 
     try {
 
         enforceConnected();
 
-        std::auto_ptr<commands::Response> response;
+        Pointer<Response> response;
 
         if( timeout == 0 ) {
-            response.reset( this->getTransport().request( command ) );
+            response = this->getTransport().request( command );
         } else {
-            response.reset( this->getTransport().request( command, timeout ) );
+            response = this->getTransport().request( command, timeout );
         }
 
         commands::ExceptionResponse* exceptionResponse =
-            dynamic_cast<commands::ExceptionResponse*>( response.get() );
+            dynamic_cast<ExceptionResponse*>( response.get() );
 
         if( exceptionResponse != NULL ) {
 
             // Create an exception to hold the error information.
-            commands::BrokerError* brokerError =
+            BrokerError* brokerError =
                 exceptionResponse->getException()->cloneDataStructure();
             BrokerException exception( __FILE__, __LINE__, brokerError );
 
@@ -590,13 +586,13 @@
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConnection::disposeOf( const decaf::lang::Pointer<commands::DataStructure>& objectId )
+void ActiveMQConnection::disposeOf( const Pointer<DataStructure>& objectId )
     throw ( ActiveMQException ) {
 
     try{
-        commands::RemoveInfo command;
-        command.setObjectId( objectId );
-        oneway( &command );
+        Pointer<RemoveInfo> command( new RemoveInfo() );
+        command->setObjectId( objectId );
+        oneway( command );
     }
     AMQ_CATCH_RETHROW( ActiveMQException )
     AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
@@ -604,14 +600,14 @@
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConnection::disposeOf( const decaf::lang::Pointer<commands::DataStructure>& objectId,
+void ActiveMQConnection::disposeOf( const Pointer<DataStructure>& objectId,
                                     unsigned int timeout )
     throw ( ActiveMQException ) {
 
     try{
-        commands::RemoveInfo command;
-        command.setObjectId( objectId );
-        this->syncRequest( &command, timeout );
+        Pointer<RemoveInfo> command( new RemoveInfo() );
+        command->setObjectId( objectId );
+        this->syncRequest( command, timeout );
     }
     AMQ_CATCH_RETHROW( ActiveMQException )
     AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
@@ -638,19 +634,19 @@
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-const commands::ConnectionInfo& ActiveMQConnection::getConnectionInfo() const
+const ConnectionInfo& ActiveMQConnection::getConnectionInfo() const
     throw( exceptions::ActiveMQException ) {
 
     enforceConnected();
 
-    return this->connectionInfo;
+    return *this->connectionInfo;
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-const commands::ConnectionId& ActiveMQConnection::getConnectionId() const
+const ConnectionId& ActiveMQConnection::getConnectionId() const
     throw( exceptions::ActiveMQException ) {
 
     enforceConnected();
 
-    return *( this->connectionInfo.getConnectionId() );
+    return *( this->connectionInfo->getConnectionId() );
 }

Modified: activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConnection.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConnection.h?rev=745701&r1=745700&r2=745701&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConnection.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConnection.h Thu Feb 19 01:06:07 2009
@@ -44,6 +44,8 @@
 namespace activemq{
 namespace core{
 
+    using decaf::lang::Pointer;
+
     class ActiveMQSession;
     class ActiveMQProducer;
 
@@ -56,11 +58,11 @@
     {
     private:
 
-        typedef decaf::util::StlMap< decaf::lang::Pointer<commands::ConsumerId>,
+        typedef decaf::util::StlMap< Pointer<commands::ConsumerId>,
                                      Dispatcher*,
                                      commands::ConsumerId::COMPARATOR > DispatcherMap;
 
-        typedef decaf::util::StlMap< decaf::lang::Pointer<commands::ProducerId>,
+        typedef decaf::util::StlMap< Pointer<commands::ProducerId>,
                                      ActiveMQProducer*,
                                      commands::ProducerId::COMPARATOR > ProducerMap;
 
@@ -77,6 +79,11 @@
         std::auto_ptr<cms::ConnectionMetaData> connectionMetaData;
 
         /**
+         * Connection Information for this connection to the Broker
+         */
+        Pointer<commands::ConnectionInfo> connectionInfo;
+
+        /**
          * Indicates if this Connection is started
          */
         bool started;
@@ -103,11 +110,6 @@
         decaf::util::StlSet<ActiveMQSession*> activeSessions;
 
         /**
-         * Connection Information for this connection to the Broker
-         */
-        commands::ConnectionInfo connectionInfo;
-
-        /**
          * the registered exception listener
          */
         cms::ExceptionListener* exceptionListener;
@@ -115,12 +117,12 @@
         /**
          * Command sent from the Broker with its BrokerInfo
          */
-        std::auto_ptr<commands::BrokerInfo> brokerInfo;
+        Pointer<commands::BrokerInfo> brokerInfo;
 
         /**
          * Command sent from the Broker with its WireFormatInfo
          */
-        std::auto_ptr<commands::WireFormatInfo> brokerWireFormatInfo;
+        Pointer<commands::WireFormatInfo> brokerWireFormatInfo;
 
     public:
 
@@ -132,8 +134,8 @@
          * @param properties
          *        The Properties that were defined for this connection
          */
-        ActiveMQConnection( transport::Transport* transport,
-                            decaf::util::Properties* properties );
+        ActiveMQConnection( const Pointer<transport::Transport>& transport,
+                            const Pointer<decaf::util::Properties>& properties );
 
         virtual ~ActiveMQConnection();
 
@@ -154,7 +156,7 @@
          * Removes an active Producer to the Set of known producers.
          * @param producerId - The ProducerId to remove from the the known set.
          */
-        virtual void removeProducer( const decaf::lang::Pointer<commands::ProducerId>& producerId )
+        virtual void removeProducer( const Pointer<commands::ProducerId>& producerId )
             throw ( cms::CMSException );
 
         /**
@@ -163,14 +165,14 @@
          * @param dispatcher - The dispatcher to handle incoming messages for the consumer.
          */
         virtual void addDispatcher(
-            const decaf::lang::Pointer<commands::ConsumerId>& consumer, Dispatcher* dispatcher )
+            const Pointer<commands::ConsumerId>& consumer, Dispatcher* dispatcher )
                 throw ( cms::CMSException );
 
         /**
          * Removes the dispatcher for a consumer.
          * @param consumer - The consumer for which to remove the dispatcher.
          */
-        virtual void removeDispatcher( const decaf::lang::Pointer<commands::ConsumerId>& consumer )
+        virtual void removeDispatcher( const Pointer<commands::ConsumerId>& consumer )
             throw ( cms::CMSException );
 
         /**
@@ -320,7 +322,7 @@
          * transport.
          * @param command the received command object.
          */
-        virtual void onCommand( commands::Command* command );
+        virtual void onCommand( const Pointer<commands::Command>& command );
 
     public: // TransportExceptionListener
 
@@ -354,7 +356,7 @@
          * @throws ConnectorException if not currently connected, or
          * if the operation fails for any reason.
          */
-        void oneway( commands::Command* command )
+        void oneway( Pointer<commands::Command> command )
             throw ( activemq::exceptions::ActiveMQException );
 
         /**
@@ -365,7 +367,7 @@
          * @throws ConnectorException thrown if an error response was received
          * from the broker, or if any other error occurred.
          */
-        void syncRequest( commands::Command* command, unsigned int timeout = 0 )
+        void syncRequest( Pointer<commands::Command>, unsigned int timeout = 0 )
             throw ( activemq::exceptions::ActiveMQException );
 
         /**
@@ -375,7 +377,7 @@
          * @throw ConnectorException if any problems occur from sending
          * the message.
          */
-        void disposeOf( const decaf::lang::Pointer<commands::DataStructure>& objectId )
+        void disposeOf( const Pointer<commands::DataStructure>& objectId )
             throw ( activemq::exceptions::ActiveMQException );
 
         /**
@@ -386,7 +388,7 @@
          * @throw ConnectorException if any problems occur from sending
          * the message.
          */
-        void disposeOf( const decaf::lang::Pointer<commands::DataStructure>& objectId, unsigned int timeout )
+        void disposeOf( const Pointer<commands::DataStructure>& objectId, unsigned int timeout )
             throw ( activemq::exceptions::ActiveMQException );
 
         /**

Modified: activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConnectionFactory.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConnectionFactory.cpp?rev=745701&r1=745700&r2=745701&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConnectionFactory.cpp (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConnectionFactory.cpp Thu Feb 19 01:06:07 2009
@@ -19,6 +19,7 @@
 #include <decaf/net/URI.h>
 #include <decaf/util/UUID.h>
 #include <decaf/util/Properties.h>
+#include <decaf/lang/Pointer.h>
 #include <decaf/lang/exceptions/NullPointerException.h>
 #include <activemq/exceptions/ExceptionDefines.h>
 #include <activemq/transport/TransportRegistry.h>
@@ -35,6 +36,7 @@
 using namespace decaf;
 using namespace decaf::net;
 using namespace decaf::util;
+using namespace decaf::lang;
 using namespace decaf::lang::exceptions;
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -97,9 +99,8 @@
     const std::string& clientId )
        throw ( cms::CMSException ) {
 
-    // Declared here so that they can be deleted in the catch block
-    auto_ptr<Properties> properties( new Properties() );
-    auto_ptr<Transport> transport;
+    Pointer<Transport> transport;
+    Pointer<Properties> properties( new Properties() );
     auto_ptr<ActiveMQConnection> connection;
     std::string clientIdLocal = clientId;
 
@@ -128,10 +129,10 @@
         activemq::util::URISupport::parseQuery( uri.getQuery(), properties.get() );
 
         // Use the TransportBuilder to get our Transport
-        transport.reset(
-            TransportRegistry::getInstance().findFactory( uri.getScheme() )->create( uri ) );
+        transport =
+            TransportRegistry::getInstance().findFactory( uri.getScheme() )->create( uri );
 
-        if( transport.get() == NULL ){
+        if( transport == NULL ){
             throw ActiveMQException(
                 __FILE__, __LINE__,
                 "ActiveMQConnectionFactory::createConnection - "
@@ -139,8 +140,7 @@
         }
 
         // Create and Return the new connection object.
-        connection.reset(
-            new ActiveMQConnection( transport.release(), properties.release() ) );
+        connection.reset( new ActiveMQConnection( transport, properties ) );
 
         return connection.release();
     }

Modified: activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConnectionSupport.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConnectionSupport.cpp?rev=745701&r1=745700&r2=745701&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConnectionSupport.cpp (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConnectionSupport.cpp Thu Feb 19 01:06:07 2009
@@ -33,8 +33,9 @@
 using namespace decaf::lang::exceptions;
 
 ////////////////////////////////////////////////////////////////////////////////
-ActiveMQConnectionSupport::ActiveMQConnectionSupport( transport::Transport* transport,
-                                                      decaf::util::Properties* properties ) {
+ActiveMQConnectionSupport::ActiveMQConnectionSupport(
+    const Pointer<transport::Transport>& transport,
+    const Pointer<decaf::util::Properties>& properties ) {
 
     if( transport  == NULL ) {
         throw decaf::lang::exceptions::IllegalArgumentException(
@@ -43,8 +44,8 @@
             "Required Parameter 'transport' was NULL.");
     }
 
-    this->properties.reset( properties );
-    this->transport.reset( transport );
+    this->properties = properties;
+    this->transport = transport;
 
     // Check the connection options
     this->setAlwaysSyncSend( Boolean::parseBoolean(

Modified: activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConnectionSupport.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConnectionSupport.h?rev=745701&r1=745700&r2=745701&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConnectionSupport.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConnectionSupport.h Thu Feb 19 01:06:07 2009
@@ -28,22 +28,25 @@
 
 #include <decaf/util/Properties.h>
 #include <decaf/lang/Exception.h>
+#include <decaf/lang/Pointer.h>
 
 #include <memory>
 
 namespace activemq {
 namespace core {
 
+    using decaf::lang::Pointer;
+
     class AMQCPP_API ActiveMQConnectionSupport :
         public transport::TransportListener
     {
     private:
 
         // Properties used to configure this connection.
-        std::auto_ptr<decaf::util::Properties> properties;
+        Pointer<decaf::util::Properties> properties;
 
         // Transport we are using
-        std::auto_ptr<transport::Transport> transport;
+        Pointer<transport::Transport> transport;
 
         /**
          * Boolean indicating that we are to always send message Synchronously.
@@ -132,8 +135,8 @@
          * @param properties
          *        The URI configured properties for this connection.
          */
-        ActiveMQConnectionSupport( transport::Transport* transport,
-                                   decaf::util::Properties* properties );
+        ActiveMQConnectionSupport( const Pointer<transport::Transport>& transport,
+                                   const Pointer<decaf::util::Properties>& properties );
 
         virtual ~ActiveMQConnectionSupport();
 

Modified: activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConsumer.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConsumer.cpp?rev=745701&r1=745700&r2=745701&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConsumer.cpp (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConsumer.cpp Thu Feb 19 01:06:07 2009
@@ -34,9 +34,9 @@
 #include <cms/ExceptionListener.h>
 
 using namespace std;
-using namespace cms;
 using namespace activemq;
 using namespace activemq::core;
+using namespace activemq::commands;
 using namespace activemq::exceptions;
 using namespace decaf::lang;
 using namespace decaf::lang::exceptions;
@@ -44,7 +44,7 @@
 using namespace decaf::util::concurrent;
 
 ////////////////////////////////////////////////////////////////////////////////
-ActiveMQConsumer::ActiveMQConsumer( commands::ConsumerInfo* consumerInfo,
+ActiveMQConsumer::ActiveMQConsumer( const Pointer<ConsumerInfo>& consumerInfo,
                                     ActiveMQSession* session,
                                     ActiveMQTransactionContext* transaction ) {
 
@@ -57,12 +57,12 @@
     // Initialize Producer Data
     this->session = session;
     this->transaction = transaction;
-    this->consumerInfo.reset( consumerInfo );
+    this->consumerInfo = consumerInfo;
     this->listener = NULL;
     this->closed = false;
 
     // Send our info to the Broker.
-    this->session->syncRequest( this->consumerInfo.get() );
+    this->session->syncRequest( this->consumerInfo );
 }
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -127,7 +127,7 @@
 
     try {
         // Fetch the Selector
-        return consumerInfo->getSelector();
+        return this->consumerInfo->getSelector();
     }
     AMQ_CATCH_RETHROW( ActiveMQException )
     AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
@@ -146,7 +146,7 @@
 
             // Calculate the deadline
             long long deadline = 0;
-            if (timeout > 0) {
+            if( timeout > 0 ) {
                 deadline = Date::getCurrentTimeMilliseconds() + timeout;
             }
 
@@ -173,7 +173,7 @@
                 // to the user.
                 DispatchData data = unconsumedMessages.pop();
 
-                decaf::lang::Pointer<commands::Message> message = data.getMessage();
+                Pointer<Message> message = data.getMessage();
 
                 // If it's expired, process the message and then go back to waiting.
                 if( message->isExpired() ) {
@@ -194,7 +194,7 @@
             }
         }
 
-        return decaf::lang::Pointer<commands::Message>();
+        return Pointer<Message>();
     }
     AMQ_CATCH_RETHROW( ActiveMQException )
     AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
@@ -212,24 +212,24 @@
         this->sendPullRequest( 0 );
 
         // Wait for the next message.
-        decaf::lang::Pointer<commands::Message> msg = dequeue( -1 );
-        if( msg == NULL ) {
+        Pointer<Message> message = dequeue( -1 );
+        if( message == NULL ) {
             return NULL;
         }
 
         // Message preprocessing
-        beforeMessageIsConsumed( msg );
+        beforeMessageIsConsumed( message );
 
         // Need to clone the message because the user is responsible for freeing
         // its copy of the message.
-        cms::Message* clonedMsg =
-            dynamic_cast<cms::Message*>( msg->cloneDataStructure() );
+        cms::Message* clonedMessage =
+            dynamic_cast<cms::Message*>( message->cloneDataStructure() );
 
         // Post processing (may result in the message being deleted)
-        afterMessageIsConsumed( msg, false );
+        afterMessageIsConsumed( message, false );
 
         // Return the cloned message.
-        return clonedMsg;
+        return clonedMessage;
     }
     AMQ_CATCH_RETHROW( ActiveMQException )
     AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
@@ -248,24 +248,24 @@
         this->sendPullRequest( millisecs );
 
         // Wait for the next message.
-        decaf::lang::Pointer<commands::Message> msg = dequeue( millisecs );
-        if( msg == NULL ) {
+        Pointer<Message> message = dequeue( millisecs );
+        if( message == NULL ) {
             return NULL;
         }
 
         // Message preprocessing
-        beforeMessageIsConsumed( msg );
+        beforeMessageIsConsumed( message );
 
         // Need to clone the message because the user is responsible for freeing
         // its copy of the message.
-        cms::Message* clonedMsg =
-            dynamic_cast<cms::Message*>( msg->cloneDataStructure() );
+        cms::Message* clonedMessage =
+            dynamic_cast<cms::Message*>( message->cloneDataStructure() );
 
         // Post processing (may result in the message being deleted)
-        afterMessageIsConsumed( msg, false );
+        afterMessageIsConsumed( message, false );
 
         // Return the cloned message.
-        return clonedMsg;
+        return clonedMessage;
     }
     AMQ_CATCH_RETHROW( ActiveMQException )
     AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
@@ -284,24 +284,24 @@
         this->sendPullRequest( -1 );
 
         // Get the next available message, if there is one.
-        decaf::lang::Pointer<commands::Message> msg = dequeue( 0 );
-        if( msg == NULL ) {
+        Pointer<Message> message = dequeue( 0 );
+        if( message == NULL ) {
             return NULL;
         }
 
         // Message preprocessing
-        beforeMessageIsConsumed( msg );
+        beforeMessageIsConsumed( message );
 
         // Need to clone the message because the user is responsible for freeing
         // its copy of the message.
-        cms::Message* clonedMsg =
-            dynamic_cast<cms::Message*>( msg->cloneDataStructure() );
+        cms::Message* clonedMessage =
+            dynamic_cast<cms::Message*>( message->cloneDataStructure() );
 
         // Post processing (may result in the message being deleted)
-        afterMessageIsConsumed( msg, false );
+        afterMessageIsConsumed( message, false );
 
         // Return the cloned message.
-        return clonedMsg;
+        return clonedMessage;
     }
     AMQ_CATCH_RETHROW( ActiveMQException )
     AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
@@ -340,7 +340,7 @@
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConsumer::beforeMessageIsConsumed( decaf::lang::Pointer<commands::Message>& message ) {
+void ActiveMQConsumer::beforeMessageIsConsumed( const Pointer<Message>& message ) {
 
     // If the Session is in ClientAcknowledge mode, then we set the
     // handler in the message to this object and send it out.  Otherwise
@@ -375,7 +375,7 @@
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConsumer::afterMessageIsConsumed( decaf::lang::Pointer<commands::Message>& message,
+void ActiveMQConsumer::afterMessageIsConsumed( const Pointer<Message>& message,
                                                bool messageExpired AMQCPP_UNUSED ) {
 
     try{
@@ -419,13 +419,13 @@
                 "ActiveMQConsumer::acknowledge - Message passed to Ack was NULL.");
         }
 
-        commands::MessageAck ack;
-        ack.setAckType( (int)ackType );
-        ack.setConsumerId( consumerInfo->getConsumerId() );
-        ack.setDestination( message->getDestination() );
-        ack.setFirstMessageId( message->getMessageId() );
-        ack.setLastMessageId( message->getMessageId() );
-        ack.setMessageCount( 1 );
+        Pointer<MessageAck> ack( new MessageAck() );
+        ack->setAckType( (int)ackType );
+        ack->setConsumerId( this->consumerInfo->getConsumerId() );
+        ack->setDestination( message->getDestination() );
+        ack->setFirstMessageId( message->getMessageId() );
+        ack->setLastMessageId( message->getMessageId() );
+        ack->setMessageCount( 1 );
 
         if( this->session->getAcknowledgeMode() == cms::Session::SESSION_TRANSACTED ) {
 
@@ -437,10 +437,10 @@
                         "Transacted Session, has no Transaction Info.");
             }
 
-            ack.setTransactionId( this->transaction->getTransactionId() );
+            ack->setTransactionId( this->transaction->getTransactionId() );
         }
 
-        this->session->oneway( &ack );
+        this->session->oneway( ack );
     }
     AMQ_CATCH_RETHROW( ActiveMQException )
     AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
@@ -452,10 +452,10 @@
 
     try {
 
-        decaf::lang::Pointer<commands::Message> message = data.getMessage();
+        Pointer<Message> message = data.getMessage();
 
         // Don't dispatch expired messages, ack it and then destroy it
-        if( message.get()->isExpired() ) {
+        if( message->isExpired() ) {
             this->acknowledge( message.get(), ActiveMQConstants::ACK_TYPE_CONSUMED );
 
             // stop now, don't queue
@@ -517,12 +517,12 @@
 
         if( this->consumerInfo->getPrefetchSize() == 0 ) {
 
-            commands::MessagePull messagePull;
-            messagePull.setConsumerId( this->consumerInfo->getConsumerId() );
-            messagePull.setDestination( this->consumerInfo->getDestination() );
-            messagePull.setTimeout( timeout );
+            Pointer<MessagePull> messagePull( new MessagePull() );
+            messagePull->setConsumerId( this->consumerInfo->getConsumerId() );
+            messagePull->setDestination( this->consumerInfo->getDestination() );
+            messagePull->setTimeout( timeout );
 
-            this->session->oneway( &messagePull );
+            this->session->oneway( messagePull );
         }
     }
     AMQ_CATCH_RETHROW( ActiveMQException )

Modified: activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConsumer.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConsumer.h?rev=745701&r1=745700&r2=745701&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConsumer.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConsumer.h Thu Feb 19 01:06:07 2009
@@ -28,6 +28,7 @@
 #include <activemq/core/ActiveMQAckHandler.h>
 #include <activemq/core/Dispatcher.h>
 
+#include <decaf/lang/Pointer.h>
 #include <decaf/util/StlQueue.h>
 #include <decaf/util/concurrent/Mutex.h>
 #include <memory>
@@ -35,6 +36,8 @@
 namespace activemq{
 namespace core{
 
+    using decaf::lang::Pointer;
+
     class ActiveMQSession;
     class ActiveMQTransactionContext;
 
@@ -58,7 +61,7 @@
         /**
          * The Consumer info for this Consumer
          */
-        std::auto_ptr<commands::ConsumerInfo> consumerInfo;
+        Pointer<commands::ConsumerInfo> consumerInfo;
 
         /**
          * The Message Listener for this Consumer
@@ -85,7 +88,7 @@
         /**
          * Constructor
          */
-        ActiveMQConsumer( commands::ConsumerInfo* consumerInfo,
+        ActiveMQConsumer( const Pointer<commands::ConsumerInfo>& consumerInfo,
                           ActiveMQSession* session,
                           ActiveMQTransactionContext* transaction );
 
@@ -224,7 +227,7 @@
          * @throws InvalidStateException if this consumer is closed upon
          * entering this method.
          */
-        decaf::lang::Pointer<commands::Message> dequeue( int timeout )
+        Pointer<commands::Message> dequeue( int timeout )
             throw ( cms::CMSException );
 
         /**
@@ -232,7 +235,7 @@
          * @param message - the message being consumed.
          */
         void beforeMessageIsConsumed(
-            decaf::lang::Pointer<commands::Message>& message );
+            const Pointer<commands::Message>& message );
 
         /**
          * Post-consume processing
@@ -240,7 +243,7 @@
          * @param messageExpired - flag indicating if the message has expired.
          */
         void afterMessageIsConsumed(
-            decaf::lang::Pointer<commands::Message>& message, bool messageExpired );
+            const Pointer<commands::Message>& message, bool messageExpired );
 
     private:
 

Modified: activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQProducer.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQProducer.cpp?rev=745701&r1=745700&r2=745701&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQProducer.cpp (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQProducer.cpp Thu Feb 19 01:06:07 2009
@@ -32,7 +32,7 @@
 using namespace decaf::lang::exceptions;
 
 ////////////////////////////////////////////////////////////////////////////////
-ActiveMQProducer::ActiveMQProducer( commands::ProducerInfo* producerInfo,
+ActiveMQProducer::ActiveMQProducer( const Pointer<commands::ProducerInfo>& producerInfo,
                                     const cms::Destination* destination,
                                     ActiveMQSession* session ) {
 
@@ -44,7 +44,7 @@
 
     // Init Producer Data
     this->session = session;
-    this->producerInfo.reset( producerInfo );
+    this->producerInfo = producerInfo;
     this->destination.reset( destination != NULL ? destination->clone() : NULL );
     this->closed = false;
 
@@ -55,7 +55,7 @@
     this->defaultPriority = 4;
     this->defaultTimeToLive = 0;
 
-    this->session->syncRequest( this->producerInfo.get() );
+    this->session->syncRequest( this->producerInfo );
 }
 
 ////////////////////////////////////////////////////////////////////////////////

Modified: activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQProducer.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQProducer.h?rev=745701&r1=745700&r2=745701&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQProducer.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQProducer.h Thu Feb 19 01:06:07 2009
@@ -33,6 +33,8 @@
 namespace activemq{
 namespace core{
 
+    using decaf::lang::Pointer;
+
     class ActiveMQSession;
 
     class AMQCPP_API ActiveMQProducer : public cms::MessageProducer {
@@ -60,7 +62,7 @@
         ActiveMQSession* session;
 
         // This Producers protocol specific info object
-        std::auto_ptr<commands::ProducerInfo> producerInfo;
+        Pointer<commands::ProducerInfo> producerInfo;
 
         // Boolean that indicates if the consumer has been closed
         bool closed;
@@ -84,7 +86,7 @@
          * @param session
          *        The Session which is the parent of this Producer.
          */
-        ActiveMQProducer( commands::ProducerInfo* producerInfo,
+        ActiveMQProducer( const Pointer<commands::ProducerInfo>& producerInfo,
                           const cms::Destination* destination,
                           ActiveMQSession* session );
 

Modified: activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQSession.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQSession.cpp?rev=745701&r1=745700&r2=745701&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQSession.cpp (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQSession.cpp Thu Feb 19 01:06:07 2009
@@ -51,17 +51,17 @@
 #include <decaf/lang/exceptions/NullPointerException.h>
 
 using namespace std;
-using namespace cms;
 using namespace activemq;
 using namespace activemq::util;
 using namespace activemq::core;
+using namespace activemq::commands;
 using namespace activemq::exceptions;
 using namespace decaf::util;
 using namespace decaf::lang;
 using namespace decaf::lang::exceptions;
 
 ////////////////////////////////////////////////////////////////////////////////
-ActiveMQSession::ActiveMQSession( commands::SessionInfo* sessionInfo,
+ActiveMQSession::ActiveMQSession( const Pointer<SessionInfo>& sessionInfo,
                                   cms::Session::AcknowledgeMode ackMode,
                                   const Properties& properties,
                                   ActiveMQConnection* connection ) {
@@ -72,7 +72,7 @@
             "ActiveMQSession::ActiveMQSession - Init with NULL data");
     }
 
-    this->sessionInfo.reset( sessionInfo );
+    this->sessionInfo = sessionInfo;
     this->connection = connection;
     this->closed = false;
     this->ackMode = ackMode;
@@ -144,7 +144,7 @@
             }
         }
 
-        // TODO = Commit it first.
+        // TODO = Commit it first.  ??
         // Destroy the Transaction
         if( this->transaction.get() != NULL ){
             this->transaction->commit();
@@ -251,13 +251,13 @@
 
         this->checkClosed();
 
-        std::auto_ptr<commands::ConsumerInfo> consumerInfo( createConsumerInfo( destination ) );
+        Pointer<ConsumerInfo> consumerInfo( createConsumerInfo( destination ) );
 
         consumerInfo->setSelector( selector );
         consumerInfo->setNoLocal( noLocal );
 
         // Override default options with uri-encoded parameters.
-        this->applyDestinationOptions( consumerInfo.get() );
+        this->applyDestinationOptions( consumerInfo );
 
         // Register this as a message dispatcher for the consumer since we
         // could start receiving messages from the broker right away once we
@@ -266,7 +266,7 @@
 
         // Create the consumer instance.
         std::auto_ptr<ActiveMQConsumer> consumer(
-            new ActiveMQConsumer( consumerInfo.release(), this, this->transaction.get() ) );
+            new ActiveMQConsumer( consumerInfo, this, this->transaction.get() ) );
 
         // Add the consumer to the map.
         synchronized( &this->consumers ) {
@@ -292,14 +292,14 @@
 
         this->checkClosed();
 
-        std::auto_ptr<commands::ConsumerInfo> consumerInfo( createConsumerInfo( destination ) );
+        Pointer<ConsumerInfo> consumerInfo( createConsumerInfo( destination ) );
 
         consumerInfo->setSelector( selector );
         consumerInfo->setNoLocal( noLocal );
         consumerInfo->setSubscriptionName( name );
 
         // Override default options with uri-encoded parameters.
-        this->applyDestinationOptions( consumerInfo.get() );
+        this->applyDestinationOptions( consumerInfo );
 
         // Register this as a message dispatcher for the consumer since we
         // could start receiving messages from the broker right away once we
@@ -308,7 +308,7 @@
 
         // Create the consumer instance.
         std::auto_ptr<ActiveMQConsumer> consumer(
-            new ActiveMQConsumer( consumerInfo.release(), this, this->transaction.get() ) );
+            new ActiveMQConsumer( consumerInfo, this, this->transaction.get() ) );
 
         // Add the consumer to the map.
         synchronized( &this->consumers ) {
@@ -336,7 +336,7 @@
         producerId->setSessionId( this->sessionInfo->getSessionId()->getValue() );
         producerId->setValue( this->connection->getNextProducerId() );
 
-        std::auto_ptr<commands::ProducerInfo> producerInfo( new commands::ProducerInfo() );
+        Pointer<commands::ProducerInfo> producerInfo( new commands::ProducerInfo() );
         producerInfo->setProducerId( producerId );
         producerInfo->setWindowSize( this->connection->getProducerWindowSize() );
 
@@ -365,7 +365,7 @@
 
         // Create the producer instance.
         std::auto_ptr<ActiveMQProducer> producer(
-            new ActiveMQProducer( producerInfo.release(), destination, this ) );
+            new ActiveMQProducer( producerInfo, destination, this ) );
 
         producer->setSendTimeout( this->connection->getSendTimeout() );
 
@@ -491,7 +491,7 @@
     try{
 
         this->checkClosed();
-        BytesMessage* msg = createBytesMessage();
+        cms::BytesMessage* msg = createBytesMessage();
         msg->setBodyBytes( bytes, bytesSize );
         return msg;
     }
@@ -521,7 +521,7 @@
     try {
 
         this->checkClosed();
-        TextMessage* msg = createTextMessage();
+        cms::TextMessage* msg = createTextMessage();
         msg->setText( text.c_str() );
         return msg;
     }
@@ -585,7 +585,6 @@
         id->setProducerSequenceId( this->connection->getNextProducerSequenceId() );
 
         amqMessage->setMessageId( id );
-        amqMessage->setProducerId( producer->getProducerInfo().getProducerId() );
 
         if( this->getAcknowledgeMode() == cms::Session::SESSION_TRANSACTED ) {
 
@@ -599,23 +598,34 @@
             amqMessage->setTransactionId( this->transaction->getTransactionId() );
         }
 
+        // NOTE:
+        // Now we copy the message before sending, this allows the user to reuse the
+        // message object without interfering with the copy that's being sent.  We
+        // could make this step optional to increase performance but for now we won't.
+        // To not do this implies that the user must never reuse the message object, or
+        // know that the configuration of Transports doesn't involve the message hanging
+        // around beyond the point that send returns.
+        Pointer<commands::Message> msgCopy( amqMessage->cloneDataStructure() );
+
+        msgCopy->setProducerId( producer->getProducerInfo().getProducerId() );
+
         if( this->connection->getSendTimeout() <= 0 &&
-            !amqMessage->isResponseRequired() &&
+            !msgCopy->isResponseRequired() &&
             !this->connection->isAlwaysSyncSend() &&
-            ( !amqMessage->isPersistent() || this->connection->isUseAsyncSend() ||
-              amqMessage->getTransactionId() != NULL ) ) {
+            ( !msgCopy->isPersistent() || this->connection->isUseAsyncSend() ||
+                msgCopy->getTransactionId() != NULL ) ) {
 
             if( usage != NULL ) {
-                usage->enqueueUsage( amqMessage->getSize() );
+                usage->enqueueUsage( msgCopy->getSize() );
             }
 
             // No Response Required.
-            this->connection->oneway( amqMessage );
+            this->connection->oneway( msgCopy );
 
         } else {
 
             // Send the message to the broker.
-            this->connection->syncRequest( amqMessage, this->connection->getSendTimeout() );
+            this->connection->syncRequest( msgCopy, this->connection->getSendTimeout() );
         }
     }
     AMQ_CATCH_RETHROW( ActiveMQException )
@@ -635,21 +645,20 @@
 
 ////////////////////////////////////////////////////////////////////////////////
 void ActiveMQSession::unsubscribe( const std::string& name )
-    throw ( CMSException ) {
+    throw ( cms::CMSException ) {
 
     try{
 
         this->checkClosed();
 
-        std::auto_ptr<commands::RemoveSubscriptionInfo> rsi(
-            new commands::RemoveSubscriptionInfo() );
+        Pointer<RemoveSubscriptionInfo> rsi( new RemoveSubscriptionInfo() );
 
         rsi->setConnectionId( this->connection->getConnectionInfo().getConnectionId() );
         rsi->setSubcriptionName( name );
         rsi->setClientId( this->connection->getConnectionInfo().getClientId() );
 
         // Send the message to the broker.
-        this->connection->syncRequest( rsi.get() );
+        this->connection->syncRequest( rsi );
     }
     AMQ_CATCH_RETHROW( ActiveMQException )
     AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
@@ -709,15 +718,15 @@
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-commands::ConsumerInfo* ActiveMQSession::createConsumerInfo(
-    const cms::Destination* destination )throw ( activemq::exceptions::ActiveMQException ) {
+ConsumerInfo* ActiveMQSession::createConsumerInfo(
+    const cms::Destination* destination ) throw ( activemq::exceptions::ActiveMQException ) {
 
     try{
 
         this->checkClosed();
 
-        std::auto_ptr<commands::ConsumerInfo> consumerInfo( new commands::ConsumerInfo() );
-        decaf::lang::Pointer<commands::ConsumerId> consumerId( new commands::ConsumerId() );
+        std::auto_ptr<ConsumerInfo> consumerInfo( new commands::ConsumerInfo() );
+        decaf::lang::Pointer<ConsumerId> consumerId( new commands::ConsumerId() );
 
         consumerId->setConnectionId(
             this->connection->getConnectionId().getValue() );
@@ -728,8 +737,8 @@
 
         // Cast the destination to an OpenWire destination, so we can
         // get all the goodies.
-        const commands::ActiveMQDestination* amqDestination =
-            dynamic_cast<const commands::ActiveMQDestination*>( destination );
+        const ActiveMQDestination* amqDestination =
+            dynamic_cast<const ActiveMQDestination*>( destination );
 
         if( amqDestination == NULL ) {
             throw activemq::exceptions::ActiveMQException( __FILE__, __LINE__,
@@ -737,8 +746,7 @@
         }
 
         consumerInfo->setDestination(
-            decaf::lang::Pointer<commands::ActiveMQDestination>(
-                amqDestination->cloneDataStructure() ) );
+            Pointer<ActiveMQDestination>( amqDestination->cloneDataStructure() ) );
 
         return consumerInfo.release();
     }
@@ -748,7 +756,7 @@
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ActiveMQSession::applyDestinationOptions( commands::ConsumerInfo* info ) {
+void ActiveMQSession::applyDestinationOptions( const Pointer<ConsumerInfo>& info ) {
 
     decaf::lang::Pointer<commands::ActiveMQDestination> amqDestination = info->getDestination();
 
@@ -853,15 +861,14 @@
 
     try {
 
-        commands::DestinationInfo command;
-        command.setConnectionId( this->connection->getConnectionInfo().getConnectionId() );
-        command.setOperationType( ActiveMQConstants::DESTINATION_ADD_OPERATION );
-        command.setDestination(
-            decaf::lang::Pointer<commands::ActiveMQTempDestination>(
-                tempDestination->cloneDataStructure() ) );
+        Pointer<DestinationInfo> command( new DestinationInfo() );
+        command->setConnectionId( this->connection->getConnectionInfo().getConnectionId() );
+        command->setOperationType( ActiveMQConstants::DESTINATION_ADD_OPERATION );
+        command->setDestination(
+            Pointer<ActiveMQTempDestination>( tempDestination->cloneDataStructure() ) );
 
         // Send the message to the broker.
-        this->connection->syncRequest( &command );
+        this->syncRequest( command );
 
         // Now that its setup, link it to this Connection so it can be closed.
         tempDestination->setConnection( this->connection );
@@ -878,16 +885,15 @@
 
     try {
 
-        commands::DestinationInfo command;
+        Pointer<DestinationInfo> command( new DestinationInfo() );
 
-        command.setConnectionId( this->connection->getConnectionInfo().getConnectionId() );
-        command.setOperationType( ActiveMQConstants::DESTINATION_REMOVE_OPERATION );
-        command.setDestination(
-            decaf::lang::Pointer<commands::ActiveMQTempDestination>(
-                tempDestination->cloneDataStructure() ) );
+        command->setConnectionId( this->connection->getConnectionInfo().getConnectionId() );
+        command->setOperationType( ActiveMQConstants::DESTINATION_REMOVE_OPERATION );
+        command->setDestination(
+            Pointer<ActiveMQTempDestination>( tempDestination->cloneDataStructure() ) );
 
         // Send the message to the broker.
-        this->connection->syncRequest( &command );
+        this->connection->syncRequest( command );
     }
     AMQ_CATCH_RETHROW( activemq::exceptions::ActiveMQException )
     AMQ_CATCH_EXCEPTION_CONVERT( Exception, activemq::exceptions::ActiveMQException )
@@ -912,7 +918,7 @@
     throw ( activemq::exceptions::ActiveMQException ) {
 
     try{
-        std::auto_ptr<commands::LocalTransactionId> id( new commands::LocalTransactionId() );
+        std::auto_ptr<LocalTransactionId> id( new LocalTransactionId() );
 
         id->setConnectionId( this->connection->getConnectionInfo().getConnectionId() );
         id->setValue( this->connection->getNextTransactionId() );
@@ -925,7 +931,7 @@
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ActiveMQSession::oneway( commands::Command* command )
+void ActiveMQSession::oneway( Pointer<Command> command )
     throw ( activemq::exceptions::ActiveMQException ) {
 
     try{
@@ -938,7 +944,7 @@
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ActiveMQSession::syncRequest( commands::Command* command, unsigned int timeout )
+void ActiveMQSession::syncRequest( Pointer<Command> command, unsigned int timeout )
     throw ( activemq::exceptions::ActiveMQException ) {
 
     try{
@@ -960,7 +966,7 @@
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ActiveMQSession::disposeOf( decaf::lang::Pointer<commands::ConsumerId> id )
+void ActiveMQSession::disposeOf( Pointer<ConsumerId> id )
     throw ( activemq::exceptions::ActiveMQException ) {
 
     try{
@@ -1008,7 +1014,7 @@
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ActiveMQSession::disposeOf( decaf::lang::Pointer<commands::ProducerId> id )
+void ActiveMQSession::disposeOf( Pointer<ProducerId> id )
     throw ( activemq::exceptions::ActiveMQException ) {
 
     try{
@@ -1031,7 +1037,7 @@
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-ActiveMQConsumer* ActiveMQSession::getConsumer( const decaf::lang::Pointer<commands::ConsumerId>& id ) {
+ActiveMQConsumer* ActiveMQSession::getConsumer( const Pointer<ConsumerId>& id ) {
 
     synchronized( &this->consumers ) {
         if( this->consumers.containsKey( id ) ) {

Modified: activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQSession.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQSession.h?rev=745701&r1=745700&r2=745701&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQSession.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQSession.h Thu Feb 19 01:06:07 2009
@@ -41,6 +41,8 @@
 namespace activemq{
 namespace core{
 
+    using decaf::lang::Pointer;
+
     class ActiveMQTransactionContext;
     class ActiveMQConnection;
     class ActiveMQConsumer;
@@ -52,11 +54,11 @@
     class AMQCPP_API ActiveMQSession : public cms::Session, public Dispatcher {
     private:
 
-        typedef decaf::util::StlMap< decaf::lang::Pointer<commands::ConsumerId>,
+        typedef decaf::util::StlMap< Pointer<commands::ConsumerId>,
                                      ActiveMQConsumer*,
                                      commands::ConsumerId::COMPARATOR> ConsumersMap;
 
-        typedef decaf::util::StlMap< decaf::lang::Pointer<commands::ProducerId>,
+        typedef decaf::util::StlMap< Pointer<commands::ProducerId>,
                                      ActiveMQProducer*,
                                      commands::ProducerId::COMPARATOR> ProducersMap;
 
@@ -65,7 +67,7 @@
         /**
          * SessionInfo for this Session
          */
-        std::auto_ptr<commands::SessionInfo> sessionInfo;
+        Pointer<commands::SessionInfo> sessionInfo;
 
         /**
          * Transaction Management object
@@ -104,7 +106,7 @@
 
     public:
 
-        ActiveMQSession( commands::SessionInfo* sessionInfo,
+        ActiveMQSession( const Pointer<commands::SessionInfo>& sessionInfo,
                          cms::Session::AcknowledgeMode ackMode,
                          const decaf::util::Properties& properties,
                          ActiveMQConnection* connection );
@@ -162,7 +164,7 @@
          */
         virtual void dispatch( DispatchData& message );
 
-    public:   // Implements Mehtods
+    public:   // Implements Methods
 
         /**
          * Closes this session as well as any active child consumers or
@@ -413,10 +415,10 @@
         /**
          * Sends a oneway message.
          * @param command The message to send.
-         * @throws ConnectorException if not currently connected, or
+         * @throws ActiveMQException if not currently connected, or
          * if the operation fails for any reason.
          */
-        void oneway( commands::Command* command )
+        void oneway( Pointer<commands::Command> command )
             throw ( activemq::exceptions::ActiveMQException );
 
         /**
@@ -424,10 +426,10 @@
          * Converts any error responses into an exception.
          * @param command The request command.
          * @param timeout The time to wait for a response, default is zero or infinite.
-         * @throws ConnectorException thrown if an error response was received
+         * @throws ActiveMQException thrown if an error response was received
          * from the broker, or if any other error occurred.
          */
-        void syncRequest( commands::Command* command, unsigned int timeout = 0 )
+        void syncRequest( Pointer<commands::Command> command, unsigned int timeout = 0 )
             throw ( activemq::exceptions::ActiveMQException );
 
         /**
@@ -460,7 +462,7 @@
 
        // Using options from the Destination URI override any settings that are
        // defined for this consumer.
-       void applyDestinationOptions( commands::ConsumerInfo* info );
+       void applyDestinationOptions( const Pointer<commands::ConsumerInfo>& info );
 
        // Send the Destination Creation Request to the Broker, alerting it
        // that we've created a new Temporary Destination.

Modified: activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQSessionExecutor.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQSessionExecutor.cpp?rev=745701&r1=745700&r2=745701&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQSessionExecutor.cpp (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQSessionExecutor.cpp Thu Feb 19 01:06:07 2009
@@ -34,7 +34,6 @@
     this->session = session;
     this->closed = false;
     this->started = false;
-    this->thread = NULL;
 }
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -62,8 +61,6 @@
 
     if( thread != NULL ) {
         thread->join();
-        delete thread;
-        thread = NULL;
     }
 }
 
@@ -117,7 +114,7 @@
 
         // Don't create the thread unless we need to.
         if( thread == NULL ) {
-            thread = new Thread( this );
+            thread.reset( new Thread( this ) );
             thread->start();
         }
 

Modified: activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQSessionExecutor.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQSessionExecutor.h?rev=745701&r1=745700&r2=745701&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQSessionExecutor.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQSessionExecutor.h Thu Feb 19 01:06:07 2009
@@ -23,13 +23,17 @@
 #include <activemq/commands/ConsumerId.h>
 #include <decaf/lang/Thread.h>
 #include <decaf/lang/Runnable.h>
+#include <decaf/lang/Pointer.h>
 #include <decaf/util/concurrent/Mutex.h>
+#include <decaf/util/StlList.h>
 #include <vector>
 #include <list>
 
 namespace activemq{
 namespace core{
 
+    using decaf::lang::Pointer;
+
     class ActiveMQSession;
     class ActiveMQConsumer;
 
@@ -43,12 +47,25 @@
     {
     private:
 
+        /** Session that is this executors parent. */
         ActiveMQSession* session;
+
+        /** List used to hold messages waiting to be dispatched. */
         std::list<DispatchData> messageQueue;
-        decaf::lang::Thread* thread;
+
+        /** The Dispatcher Thread */
+        Pointer<decaf::lang::Thread> thread;
+
+        /** Mutex used to lock on access to the Message Queue */
         decaf::util::concurrent::Mutex mutex;
+
+        /** Locks when messages are being dispatched to consumers. */
         decaf::util::concurrent::Mutex dispatchMutex;
+
+        /** Has the Start method been called */
         volatile bool started;
+
+        /** Has the Close method been called */
         volatile bool closed;
 
     public:

Modified: activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQTransactionContext.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQTransactionContext.cpp?rev=745701&r1=745700&r2=745701&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQTransactionContext.cpp (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQTransactionContext.cpp Thu Feb 19 01:06:07 2009
@@ -28,6 +28,7 @@
 using namespace cms;
 using namespace activemq;
 using namespace activemq::core;
+using namespace activemq::commands;
 using namespace activemq::exceptions;
 using namespace activemq::commands;
 using namespace decaf;
@@ -127,7 +128,7 @@
 
         // Commit the current Transaction
         this->transactionInfo->setType( ActiveMQConstants::TRANSACTION_STATE_COMMITONEPHASE );
-        this->connection->oneway( this->transactionInfo.get() );
+        this->connection->oneway( this->transactionInfo );
 
         // Notify each registered Synchronization that we have committed this Transaction.
         synchronized( &this->synchronizations ) {
@@ -173,7 +174,7 @@
 
         // Rollback the Transaction
         this->transactionInfo->setType( ActiveMQConstants::TRANSACTION_STATE_ROLLBACK );
-        this->connection->oneway( this->transactionInfo.get() );
+        this->connection->oneway( this->transactionInfo );
 
         // Notify each registered Synchronization that we are committing this Transaction.
         synchronized( &this->synchronizations ) {
@@ -205,10 +206,10 @@
 
     try{
 
-        this->transactionInfo.reset( new commands::TransactionInfo() );
+        this->transactionInfo.reset( new TransactionInfo() );
 
         // Create the Id
-        decaf::lang::Pointer<commands::LocalTransactionId> id( new commands::LocalTransactionId() );
+        Pointer<LocalTransactionId> id( new LocalTransactionId() );
         id->setConnectionId( this->connection->getConnectionInfo().getConnectionId() );
         id->setValue( this->connection->getNextTransactionId() );
 
@@ -217,7 +218,7 @@
         this->transactionInfo->setTransactionId( id );
         this->transactionInfo->setType( ActiveMQConstants::TRANSACTION_STATE_BEGIN );
 
-        this->connection->oneway( this->transactionInfo.get() );
+        this->connection->oneway( this->transactionInfo );
     }
     AMQ_CATCH_RETHROW( ActiveMQException )
     AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )

Modified: activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQTransactionContext.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQTransactionContext.h?rev=745701&r1=745700&r2=745701&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQTransactionContext.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQTransactionContext.h Thu Feb 19 01:06:07 2009
@@ -37,6 +37,8 @@
 namespace activemq{
 namespace core{
 
+    using decaf::lang::Pointer;
+
     class ActiveMQSession;
     class ActiveMQConnection;
 
@@ -63,7 +65,7 @@
         ActiveMQConnection* connection;
 
         // Transaction Info for the current Transaction
-        std::auto_ptr<commands::TransactionInfo> transactionInfo;
+        Pointer<commands::TransactionInfo> transactionInfo;
 
         // List of Registered Synchronizations
         decaf::util::StlSet<Synchronization*> synchronizations;

Modified: activemq/activemq-cpp/trunk/src/main/activemq/state/ConnectionStateTracker.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/state/ConnectionStateTracker.cpp?rev=745701&r1=745700&r2=745701&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/state/ConnectionStateTracker.cpp (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/state/ConnectionStateTracker.cpp Thu Feb 19 01:06:07 2009
@@ -17,14 +17,244 @@
 
 #include "ConnectionStateTracker.h"
 
+#include <decaf/lang/Runnable.h>
+#include <decaf/lang/exceptions/NoSuchElementException.h>
+
 using namespace activemq;
 using namespace activemq::state;
+using namespace activemq::commands;
+using namespace activemq::exceptions;
+using namespace decaf;
+using namespace decaf::lang;
+using namespace decaf::lang::exceptions;
+
+////////////////////////////////////////////////////////////////////////////////
+const Pointer<Tracked> ConnectionStateTracker::TRACKED_RESPONSE_MARKER( new Tracked() );
+
+////////////////////////////////////////////////////////////////////////////////
+namespace activemq {
+namespace state {
+
+    class RemoveTransactionAction : public Runnable {
+    private:
+
+        const Pointer<TransactionInfo> info;
+        ConnectionStateTracker* stateTracker;
+
+    public:
+
+        RemoveTransactionAction( ConnectionStateTracker* stateTracker,
+                                 const Pointer<TransactionInfo>& info ) :
+            info( info ), stateTracker( stateTracker ) {}
+
+        virtual ~RemoveTransactionAction() {}
+
+        virtual void run() {
+            Pointer<ConnectionId> connectionId = info->getConnectionId();
+            Pointer<ConnectionState> cs = stateTracker->connectionStates.get( connectionId );
+            cs->removeTransactionState( info->getTransactionId() );
+        }
+    };
+
+}}
 
 ////////////////////////////////////////////////////////////////////////////////
 ConnectionStateTracker::ConnectionStateTracker() {
+
+    this->trackTransactions = false;
+    this->restoreSessions = true;
+    this->restoreConsumers = true;
+    this->restoreProducers = true;
+    this->restoreTransaction = true;
+    this->trackMessages = true;
+    this->maxCacheSize = 128 * 1024;
+    this->currentCacheSize = 0;
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 ConnectionStateTracker::~ConnectionStateTracker() {
 }
 
+////////////////////////////////////////////////////////////////////////////////
+Pointer<Command> ConnectionStateTracker::processBeginTransaction( TransactionInfo* info )
+    throw ( exceptions::ActiveMQException ) {
+
+    try{
+
+        if( trackTransactions && info != NULL ) {
+            Pointer<ConnectionId> connectionId = info->getConnectionId();
+            if( connectionId != NULL ) {
+                Pointer<ConnectionState> cs = connectionStates.get( connectionId );
+                if( cs != NULL ) {
+                    Pointer<TransactionState> transactionState =
+                        cs->getTransactionState( info->getTransactionId() );
+                    if( transactionState != NULL ) {
+                        transactionState->addCommand(
+                            Pointer<Command>( info->cloneDataStructure() ) );
+                    }
+                }
+            }
+
+            return TRACKED_RESPONSE_MARKER;
+        }
+
+        return Pointer<Response>();
+    }
+    AMQ_CATCH_RETHROW( ActiveMQException )
+    AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
+    AMQ_CATCHALL_THROW( ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+Pointer<Command> ConnectionStateTracker::processPrepareTransaction( TransactionInfo* info )
+    throw ( exceptions::ActiveMQException ) {
+
+    try{
+
+        if( trackTransactions && info != NULL ) {
+            Pointer<ConnectionId> connectionId = info->getConnectionId();
+            if( connectionId != NULL ) {
+                Pointer<ConnectionState> cs = connectionStates.get( connectionId );
+                if( cs != NULL ) {
+                    Pointer<TransactionState> transactionState =
+                        cs->getTransactionState( info->getTransactionId() );
+                    if( transactionState != NULL ) {
+                        transactionState->addCommand(
+                            Pointer<Command>( info->cloneDataStructure() ) );
+                    }
+                }
+            }
+
+            return TRACKED_RESPONSE_MARKER;
+        }
+
+        return Pointer<Response>();
+    }
+    AMQ_CATCH_RETHROW( ActiveMQException )
+    AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
+    AMQ_CATCHALL_THROW( ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+Pointer<Command> ConnectionStateTracker::processCommitTransactionOnePhase( TransactionInfo* info )
+    throw ( exceptions::ActiveMQException ) {
+
+    try{
+
+        if( trackTransactions && info != NULL ) {
+            Pointer<ConnectionId> connectionId = info->getConnectionId();
+            if( connectionId != NULL ) {
+                Pointer<ConnectionState> cs = connectionStates.get( connectionId );
+                if( cs != NULL ) {
+                    Pointer<TransactionState> transactionState =
+                        cs->getTransactionState( info->getTransactionId() );
+                    if( transactionState != NULL ) {
+                        Pointer<TransactionInfo> infoCopy =
+                            Pointer<TransactionInfo>( info->cloneDataStructure() );
+                        transactionState->addCommand( infoCopy );
+                        return Pointer<Tracked>( new Tracked(
+                            Pointer<Runnable>( new RemoveTransactionAction( this, infoCopy ) ) ) );
+                    }
+                }
+            }
+        }
+
+        return Pointer<Response>();
+    }
+    AMQ_CATCH_RETHROW( ActiveMQException )
+    AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
+    AMQ_CATCHALL_THROW( ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+Pointer<Command> ConnectionStateTracker::processCommitTransactionTwoPhase( TransactionInfo* info )
+    throw ( exceptions::ActiveMQException ) {
+
+    try{
+
+        if( trackTransactions && info != NULL ) {
+            Pointer<ConnectionId> connectionId = info->getConnectionId();
+            if( connectionId != NULL ) {
+                Pointer<ConnectionState> cs = connectionStates.get( connectionId );
+                if( cs != NULL ) {
+                    Pointer<TransactionState> transactionState =
+                        cs->getTransactionState( info->getTransactionId() );
+                    if( transactionState != NULL ) {
+                        Pointer<TransactionInfo> infoCopy =
+                            Pointer<TransactionInfo>( info->cloneDataStructure() );
+                        transactionState->addCommand( infoCopy );
+                        return Pointer<Tracked>( new Tracked(
+                            Pointer<Runnable>( new RemoveTransactionAction( this, infoCopy ) ) ) );
+                    }
+                }
+            }
+        }
+
+        return Pointer<Response>();
+    }
+    AMQ_CATCH_RETHROW( ActiveMQException )
+    AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
+    AMQ_CATCHALL_THROW( ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+Pointer<Command> ConnectionStateTracker::processRollbackTransaction( TransactionInfo* info )
+    throw ( exceptions::ActiveMQException ) {
+
+    try{
+
+        if( trackTransactions && info != NULL ) {
+            Pointer<ConnectionId> connectionId = info->getConnectionId();
+            if( connectionId != NULL ) {
+                Pointer<ConnectionState> cs = connectionStates.get( connectionId );
+                if( cs != NULL ) {
+                    Pointer<TransactionState> transactionState =
+                        cs->getTransactionState( info->getTransactionId() );
+                    if( transactionState != NULL ) {
+                        Pointer<TransactionInfo> infoCopy =
+                            Pointer<TransactionInfo>( info->cloneDataStructure() );
+                        transactionState->addCommand( infoCopy );
+                        return Pointer<Tracked>( new Tracked(
+                            Pointer<Runnable>( new RemoveTransactionAction( this, infoCopy ) ) ) );
+                    }
+                }
+            }
+        }
+
+        return Pointer<Response>();
+    }
+    AMQ_CATCH_RETHROW( ActiveMQException )
+    AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
+    AMQ_CATCHALL_THROW( ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+Pointer<Command> ConnectionStateTracker::processEndTransaction( TransactionInfo* info )
+    throw ( exceptions::ActiveMQException ) {
+
+    try{
+
+        if( trackTransactions && info != NULL ) {
+            Pointer<ConnectionId> connectionId = info->getConnectionId();
+            if( connectionId != NULL ) {
+                Pointer<ConnectionState> cs = connectionStates.get( connectionId );
+                if( cs != NULL ) {
+                    Pointer<TransactionState> transactionState =
+                        cs->getTransactionState( info->getTransactionId() );
+                    if( transactionState != NULL ) {
+                        transactionState->addCommand(
+                            Pointer<Command>( info->cloneDataStructure() ) );
+                    }
+                }
+            }
+
+            return TRACKED_RESPONSE_MARKER;
+        }
+
+        return Pointer<Response>();
+    }
+    AMQ_CATCH_RETHROW( ActiveMQException )
+    AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
+    AMQ_CATCHALL_THROW( ActiveMQException )
+}
+



Mime
View raw message