Return-Path: Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: (qmail 69890 invoked from network); 19 Feb 2009 01:06:39 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 19 Feb 2009 01:06:39 -0000 Received: (qmail 48810 invoked by uid 500); 19 Feb 2009 01:06:38 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 48789 invoked by uid 500); 19 Feb 2009 01:06:38 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 48780 invoked by uid 99); 19 Feb 2009 01:06:38 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 18 Feb 2009 17:06:38 -0800 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 19 Feb 2009 01:06:32 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 5C68C2388975; Thu, 19 Feb 2009 01:06:11 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r745701 [2/3] - in /activemq/activemq-cpp/trunk/src: main/ main/activemq/core/ main/activemq/state/ main/activemq/transport/ main/activemq/transport/correlator/ main/activemq/transport/failover/ main/activemq/transport/logging/ main/activem... Date: Thu, 19 Feb 2009 01:06:08 -0000 To: commits@activemq.apache.org From: tabish@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20090219010611.5C68C2388975@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Modified: activemq/activemq-cpp/trunk/src/main/activemq/state/ConnectionStateTracker.h URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/state/ConnectionStateTracker.h?rev=745701&r1=745700&r2=745701&view=diff ============================================================================== --- activemq/activemq-cpp/trunk/src/main/activemq/state/ConnectionStateTracker.h (original) +++ activemq/activemq-cpp/trunk/src/main/activemq/state/ConnectionStateTracker.h Thu Feb 19 01:06:07 2009 @@ -19,25 +19,192 @@ #define _ACTIVEMQ_STATE_CONNECTIONSTATETRACKER_H_ #include +#include +#include #include #include #include #include #include #include +#include +#include + +#include +#include namespace activemq { namespace state { + class RemoveTransactionAction; + using decaf::lang::Pointer; + class AMQCPP_API ConnectionStateTracker : public CommandVisitorAdapter { private: + static const Pointer TRACKED_RESPONSE_MARKER; + + // TODO - Create a Thread Safe impl of Map. + decaf::util::StlMap< Pointer, + Pointer, + commands::ConnectionId::COMPARATOR > connectionStates; + + bool trackTransactions; + bool restoreSessions; + bool restoreConsumers; + bool restoreProducers; + bool restoreTransaction; + bool trackMessages; + int maxCacheSize; + int currentCacheSize; + + friend class RemoveTransactionAction; + public: ConnectionStateTracker(); virtual ~ConnectionStateTracker(); + Tracked track( const Pointer& command ) throw( decaf::io::IOException ); + + void trackBack( const Pointer& command ); + + void restore( const Pointer& transport ) + throw( decaf::io::IOException ); + + virtual Pointer processAddDestination( DestinationInfo* info ) + throw ( exceptions::ActiveMQException ); + + virtual Pointer processRemoveDestination( DestinationInfo* info ) + throw ( exceptions::ActiveMQException ); + + virtual Pointer processAddProducer( ProducerInfo* info ) + throw ( exceptions::ActiveMQException ); + + virtual Pointer processRemoveProducer( ProducerId* id ) + throw ( exceptions::ActiveMQException ); + + virtual Pointer processAddConsumer( ConsumerInfo* info ) + throw ( exceptions::ActiveMQException ); + + virtual Pointer processRemoveConsumer( ConsumerId* id ) + throw ( exceptions::ActiveMQException ); + + virtual Pointer processAddSession( SessionInfo* info ) + throw ( exceptions::ActiveMQException ); + + virtual Pointer processRemoveSession( SessionId* id ) + throw ( exceptions::ActiveMQException ); + + virtual Pointer processAddConnection( ConnectionInfo* info) + throw ( exceptions::ActiveMQException ); + + virtual Pointer processRemoveConnection( ConnectionId* id ) + throw ( exceptions::ActiveMQException ); + + virtual Pointer processMessage( Message* send ) + throw ( exceptions::ActiveMQException ); + + virtual Pointer processMessageAck( MessageAck* ack ) + throw ( exceptions::ActiveMQException ); + + virtual Pointer processBeginTransaction( TransactionInfo* info ) + throw ( exceptions::ActiveMQException ); + + virtual Pointer processPrepareTransaction( TransactionInfo* info ) + throw ( exceptions::ActiveMQException ); + + virtual Pointer processCommitTransactionOnePhase( TransactionInfo* info ) + throw ( exceptions::ActiveMQException ); + + virtual Pointer processCommitTransactionTwoPhase( TransactionInfo* info ) + throw ( exceptions::ActiveMQException ); + + virtual Pointer processRollbackTransaction( TransactionInfo* info ) + throw ( exceptions::ActiveMQException ); + + virtual Pointer processEndTransaction( TransactionInfo* info ) + throw ( exceptions::ActiveMQException ); + + bool isRestoreConsumers() const { + return this->restoreConsumers; + } + + void setRestoreConsumers( bool restoreConsumers ) { + this->restoreConsumers = restoreConsumers; + } + + bool isRestoreProducers() const { + return this->restoreProducers; + } + + void setRestoreProducers( bool restoreProducers ) { + this->restoreProducers = restoreProducers; + } + + bool isRestoreSessions() const { + return this->restoreSessions; + } + + void setRestoreSessions( bool restoreSessions ) { + this->restoreSessions = restoreSessions; + } + + bool isTrackTransactions() const { + return this->trackTransactions; + } + + void setTrackTransactions( bool trackTransactions ) { + this->trackTransactions = trackTransactions; + } + + bool isRestoreTransaction() const { + return this->restoreTransaction; + } + + void setRestoreTransaction( bool restoreTransaction ) { + this->restoreTransaction = restoreTransaction; + } + + bool isTrackMessages() const { + return this->trackMessages; + } + + void setTrackMessages( bool trackMessages ) { + this->trackMessages = trackMessages; + } + + int getMaxCacheSize() const { + return this->maxCacheSize; + } + + void setMaxCacheSize( int maxCacheSize ) { + this->maxCacheSize = maxCacheSize; + } + + private: + + void doRestoreTransactions( const Pointer& transport, + const Pointer& connectionState ) + throw( decaf::io::IOException ); + + void doRestoreSessions( const Pointer& transport, + const Pointer& connectionState ) + throw( decaf::io::IOException ); + + void doRestoreConsumers( const Pointer& transport, + const Pointer& sessionState ) + throw( decaf::io::IOException ); + + void doRestoreProducers( const Pointer& transport, + const Pointer& sessionState ) + throw( decaf::io::IOException ); + + void doRestoreTempDestinations( const Pointer& transport, + const Pointer& connectionState ) + throw( decaf::io::IOException ); + }; }} Modified: activemq/activemq-cpp/trunk/src/main/activemq/state/Tracked.cpp URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/state/Tracked.cpp?rev=745701&r1=745700&r2=745701&view=diff ============================================================================== --- activemq/activemq-cpp/trunk/src/main/activemq/state/Tracked.cpp (original) +++ activemq/activemq-cpp/trunk/src/main/activemq/state/Tracked.cpp Thu Feb 19 01:06:07 2009 @@ -22,10 +22,11 @@ using namespace activemq; using namespace activemq::state; using namespace activemq::exceptions; +using namespace decaf; +using namespace decaf::lang; //////////////////////////////////////////////////////////////////////////////// -Tracked::Tracked( decaf::lang::Runnable* runnable ) { - this->runnable = runnable; +Tracked::Tracked( const Pointer& runnable ) : runnable( runnable ) { } //////////////////////////////////////////////////////////////////////////////// @@ -35,7 +36,7 @@ if( this->runnable != NULL ) { this->runnable->run(); - this->runnable = NULL; + this->runnable.reset( NULL ); } } AMQ_CATCH_RETHROW( ActiveMQException ) Modified: activemq/activemq-cpp/trunk/src/main/activemq/state/Tracked.h URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/state/Tracked.h?rev=745701&r1=745700&r2=745701&view=diff ============================================================================== --- activemq/activemq-cpp/trunk/src/main/activemq/state/Tracked.h (original) +++ activemq/activemq-cpp/trunk/src/main/activemq/state/Tracked.h Thu Feb 19 01:06:07 2009 @@ -21,18 +21,22 @@ #include #include #include +#include namespace activemq { namespace state { + using decaf::lang::Pointer; + class AMQCPP_API Tracked : public commands::Response { private: - decaf::lang::Runnable* runnable; + Pointer runnable; public: - Tracked( decaf::lang::Runnable* runnable ); + Tracked() {} + Tracked( const Pointer& runnable ); virtual ~Tracked() {} Modified: activemq/activemq-cpp/trunk/src/main/activemq/transport/AbstractTransportFactory.cpp URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/transport/AbstractTransportFactory.cpp?rev=745701&r1=745700&r2=745701&view=diff ============================================================================== --- activemq/activemq-cpp/trunk/src/main/activemq/transport/AbstractTransportFactory.cpp (original) +++ activemq/activemq-cpp/trunk/src/main/activemq/transport/AbstractTransportFactory.cpp Thu Feb 19 01:06:07 2009 @@ -39,7 +39,7 @@ using namespace decaf::util; //////////////////////////////////////////////////////////////////////////////// -Transport* AbstractTransportFactory::create( const decaf::net::URI& location ) +Pointer AbstractTransportFactory::create( const decaf::net::URI& location ) throw ( exceptions::ActiveMQException ) { try{ @@ -47,18 +47,18 @@ Properties properties = activemq::util::URISupport::parseQuery( location.getQuery() ); - WireFormat* wireFormat = this->createWireFormat( properties ); + Pointer wireFormat = this->createWireFormat( properties ); // Create the initial Transport, then wrap it in the normal Filters - Transport* transport = doCreateComposite( location, wireFormat, properties ); + Pointer transport( doCreateComposite( location, wireFormat, properties ) ); // Create the Transport for response correlator - transport = new ResponseCorrelator( transport ); + transport.reset( new ResponseCorrelator( transport ) ); // If command tracing was enabled, wrap the transport with a logging transport. if( properties.getProperty( "transport.commandTracingEnabled", "false" ) == "true" ) { // Create the Transport for response correlator - transport = new LoggingTransport( transport ); + transport.reset( new LoggingTransport( transport ) ); } // If there is a negotiator need then we create and wrap here. @@ -74,17 +74,18 @@ } //////////////////////////////////////////////////////////////////////////////// -Transport* AbstractTransportFactory::createComposite( const decaf::net::URI& location ) +Pointer AbstractTransportFactory::createComposite( const decaf::net::URI& location ) throw ( exceptions::ActiveMQException ) { try{ + Properties properties = activemq::util::URISupport::parseQuery( location.getQuery() ); - WireFormat* wireFormat = this->createWireFormat( properties ); + Pointer wireFormat = this->createWireFormat( properties ); // Create the initial Transport, then wrap it in the normal Filters - Transport* transport = doCreateComposite( location, wireFormat, properties ); + Pointer transport( doCreateComposite( location, wireFormat, properties ) ); // If there is a negotiator need then we create and wrap here. if( wireFormat->hasNegotiator() ) { @@ -99,7 +100,7 @@ } //////////////////////////////////////////////////////////////////////////////// -WireFormat* AbstractTransportFactory::createWireFormat( +Pointer AbstractTransportFactory::createWireFormat( const decaf::util::Properties& properties ) throw( decaf::lang::exceptions::NoSuchElementException ) { @@ -110,7 +111,7 @@ WireFormatFactory* factory = WireFormatRegistry::getInstance().findFactory( wireFormat ); - return factory->createWireFormat( properties ); + return Pointer( factory->createWireFormat( properties ) ); } AMQ_CATCH_RETHROW( NoSuchElementException ) AMQ_CATCH_EXCEPTION_CONVERT( Exception, NoSuchElementException ) Modified: activemq/activemq-cpp/trunk/src/main/activemq/transport/AbstractTransportFactory.h URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/transport/AbstractTransportFactory.h?rev=745701&r1=745700&r2=745701&view=diff ============================================================================== --- activemq/activemq-cpp/trunk/src/main/activemq/transport/AbstractTransportFactory.h (original) +++ activemq/activemq-cpp/trunk/src/main/activemq/transport/AbstractTransportFactory.h Thu Feb 19 01:06:07 2009 @@ -23,11 +23,14 @@ #include #include #include +#include #include namespace activemq { namespace transport { + using decaf::lang::Pointer; + /** * Abstract implementation of the TransportFactory interface, providing * the base functionality that's common to most of the TransportFactory @@ -46,7 +49,7 @@ * @param location - URI location to connect to plus any properties to assign. * @throws ActiveMQexception if an error occurs */ - virtual Transport* create( const decaf::net::URI& location ) + virtual Pointer create( const decaf::net::URI& location ) throw ( exceptions::ActiveMQException ); /** @@ -55,7 +58,7 @@ * @param location - URI location to connect to plus any properties to assign. * @throws ActiveMQexception if an error occurs */ - virtual Transport* createComposite( const decaf::net::URI& location ) + virtual Pointer createComposite( const decaf::net::URI& location ) throw ( exceptions::ActiveMQException ); protected: @@ -70,9 +73,9 @@ * @param properties - Properties to apply to the transport. * @throws ActiveMQexception if an error occurs */ - virtual Transport* doCreateComposite( const decaf::net::URI& location, - wireformat::WireFormat* wireFormat, - const decaf::util::Properties& properties ) + virtual Pointer doCreateComposite( const decaf::net::URI& location, + const Pointer& wireFormat, + const decaf::util::Properties& properties ) throw ( exceptions::ActiveMQException ) = 0; /** @@ -86,7 +89,7 @@ * * @throws NoSuchElementException if the configured WireFormat is not found. */ - virtual wireformat::WireFormat* createWireFormat( + virtual Pointer createWireFormat( const decaf::util::Properties& properties ) throw( decaf::lang::exceptions::NoSuchElementException ); Modified: activemq/activemq-cpp/trunk/src/main/activemq/transport/DefaultTransportListener.h URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/transport/DefaultTransportListener.h?rev=745701&r1=745700&r2=745701&view=diff ============================================================================== --- activemq/activemq-cpp/trunk/src/main/activemq/transport/DefaultTransportListener.h (original) +++ activemq/activemq-cpp/trunk/src/main/activemq/transport/DefaultTransportListener.h Thu Feb 19 01:06:07 2009 @@ -21,10 +21,14 @@ #include #include #include +#include namespace activemq { namespace transport { + using decaf::lang::Pointer; + using activemq::commands::Command; + class AMQCPP_API DefaultTransportListener : public TransportListener { public: @@ -38,7 +42,7 @@ * * @param command the received command object. */ - virtual void onCommand( commands::Command* command AMQCPP_UNUSED ) {} + virtual void onCommand( const Pointer& command AMQCPP_UNUSED ) {} /** * Event handler for an exception from a command transport. Modified: activemq/activemq-cpp/trunk/src/main/activemq/transport/IOTransport.cpp URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/transport/IOTransport.cpp?rev=745701&r1=745700&r2=745701&view=diff ============================================================================== --- activemq/activemq-cpp/trunk/src/main/activemq/transport/IOTransport.cpp (original) +++ activemq/activemq-cpp/trunk/src/main/activemq/transport/IOTransport.cpp Thu Feb 19 01:06:07 2009 @@ -27,6 +27,7 @@ using namespace activemq; using namespace activemq::transport; using namespace activemq::exceptions; +using namespace activemq::commands; using namespace activemq::wireformat; using namespace decaf::lang; using namespace decaf::util::concurrent; @@ -45,14 +46,14 @@ } //////////////////////////////////////////////////////////////////////////////// -IOTransport::IOTransport( WireFormat* wireFormat ) { +IOTransport::IOTransport( const Pointer& wireFormat ) { this->listener = NULL; this->inputStream = NULL; this->outputStream = NULL; this->closed = false; this->thread = NULL; - this->wireFormat.reset( wireFormat ); + this->wireFormat = wireFormat; } //////////////////////////////////////////////////////////////////////////////// @@ -75,7 +76,7 @@ } //////////////////////////////////////////////////////////////////////////////// -void IOTransport::fire( commands::Command* command ){ +void IOTransport::fire( const Pointer& command ){ try{ // Since the listener is responsible for freeing the memory, @@ -83,21 +84,17 @@ // we have been closed then we don't deliver any messages that // might have sneaked in while we where closing. if( this->listener == NULL || this->closed == true ){ - delete command; return; } this->listener->onCommand( command ); - }catch( ... ){ - try{ - delete command; - } catch( ... ) {} } + AMQ_CATCHALL_NOTHROW() } //////////////////////////////////////////////////////////////////////////////// -void IOTransport::oneway( commands::Command* command ) +void IOTransport::oneway( const Pointer& command ) throw( CommandIOException, decaf::lang::exceptions::UnsupportedOperationException ) { try{ @@ -224,7 +221,7 @@ while( !closed ){ // Read the next command from the input stream. - commands::Command* command = wireFormat->unmarshal( this->inputStream ); + Pointer command( wireFormat->unmarshal( this->inputStream ) ); // Notify the listener. fire( command ); @@ -252,7 +249,7 @@ } //////////////////////////////////////////////////////////////////////////////// -commands::Response* IOTransport::request( commands::Command* command AMQCPP_UNUSED ) +Pointer IOTransport::request( const Pointer& command AMQCPP_UNUSED ) throw( CommandIOException, decaf::lang::exceptions::UnsupportedOperationException ){ throw decaf::lang::exceptions::UnsupportedOperationException( @@ -261,7 +258,7 @@ } //////////////////////////////////////////////////////////////////////////////// -commands::Response* IOTransport::request( commands::Command* command AMQCPP_UNUSED, unsigned int timeout AMQCPP_UNUSED ) +Pointer IOTransport::request( const Pointer& command AMQCPP_UNUSED, unsigned int timeout AMQCPP_UNUSED ) throw( CommandIOException, decaf::lang::exceptions::UnsupportedOperationException ){ throw decaf::lang::exceptions::UnsupportedOperationException( Modified: activemq/activemq-cpp/trunk/src/main/activemq/transport/IOTransport.h URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/transport/IOTransport.h?rev=745701&r1=745700&r2=745701&view=diff ============================================================================== --- activemq/activemq-cpp/trunk/src/main/activemq/transport/IOTransport.h (original) +++ activemq/activemq-cpp/trunk/src/main/activemq/transport/IOTransport.h Thu Feb 19 01:06:07 2009 @@ -22,7 +22,9 @@ #include #include #include +#include #include +#include #include #include @@ -32,11 +34,12 @@ #include namespace activemq{ -namespace wireformat{ - class WireFormat; -} namespace transport{ + using decaf::lang::Pointer; + using activemq::commands::Command; + using activemq::commands::Response; + /** * Implementation of the Transport interface that performs * marshaling of commands to IO streams. This class does not @@ -59,7 +62,7 @@ /** * WireFormat instance to use to Encode / Decode. */ - std::auto_ptr wireFormat; + Pointer wireFormat; /** * Listener of this transport. @@ -93,11 +96,12 @@ * @param ex the exception to send */ void fire( decaf::lang::Exception& ex ); + /** * Notify the command listener. * @param command the command the send */ - void fire( commands::Command* command ); + void fire( const Pointer& command ); public: @@ -112,7 +116,7 @@ * @param wireFormat * Data encoder / decoder to use when reading and writing. */ - IOTransport( wireformat::WireFormat* wireFormat ); + IOTransport( const Pointer& wireFormat ); virtual ~IOTransport(); @@ -125,7 +129,7 @@ * @throws UnsupportedOperationException if this method is not implemented * by this transport. */ - virtual void oneway( commands::Command* command ) + virtual void oneway( const Pointer& command ) throw( CommandIOException, decaf::lang::exceptions::UnsupportedOperationException ); /** @@ -134,7 +138,7 @@ * @returns the response to the command sent. * @throws UnsupportedOperationException. */ - virtual commands::Response* request( commands::Command* command ) + virtual Pointer request( const Pointer& command ) throw( CommandIOException, decaf::lang::exceptions::UnsupportedOperationException ); /** @@ -144,15 +148,15 @@ * @returns the response to the command sent. * @throws UnsupportedOperationException. */ - virtual commands::Response* request( commands::Command* command, unsigned int timeout ) + virtual Pointer request( const Pointer& command, unsigned int timeout ) throw( CommandIOException, decaf::lang::exceptions::UnsupportedOperationException ); /** * Sets the WireFormat instance to use. * @param WireFormat the object used to encode / decode commands. */ - virtual void setWireFormat( wireformat::WireFormat* wireFormat ){ - this->wireFormat.reset( wireFormat ); + virtual void setWireFormat( const Pointer& wireFormat ){ + this->wireFormat = wireFormat; } /** Modified: activemq/activemq-cpp/trunk/src/main/activemq/transport/Transport.h URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/transport/Transport.h?rev=745701&r1=745700&r2=745701&view=diff ============================================================================== --- activemq/activemq-cpp/trunk/src/main/activemq/transport/Transport.h (original) +++ activemq/activemq-cpp/trunk/src/main/activemq/transport/Transport.h Thu Feb 19 01:06:07 2009 @@ -20,6 +20,7 @@ #include #include +#include #include #include #include @@ -38,6 +39,10 @@ // Forward declarations. class TransportListener; + using decaf::lang::Pointer; + using activemq::commands::Command; + using activemq::commands::Response; + /** * Interface for a transport layer for command objects. Callers can * send oneway messages or make synchronous requests. Non-response @@ -63,7 +68,7 @@ * @throws UnsupportedOperationException if this method is not implemented * by this transport. */ - virtual void oneway( commands::Command* command ) + virtual void oneway( const Pointer& command ) throw( CommandIOException, decaf::lang::exceptions::UnsupportedOperationException ) = 0; @@ -76,7 +81,7 @@ * @throws UnsupportedOperationException if this method is not implemented * by this transport. */ - virtual commands::Response* request( commands::Command* command ) + virtual Pointer request( const Pointer& command ) throw( CommandIOException, decaf::lang::exceptions::UnsupportedOperationException ) = 0; @@ -90,7 +95,7 @@ * @throws UnsupportedOperationException if this method is not implemented * by this transport. */ - virtual commands::Response* request( commands::Command* command, unsigned int timeout ) + virtual Pointer request( const Pointer&, unsigned int timeout ) throw( CommandIOException, decaf::lang::exceptions::UnsupportedOperationException ) = 0; @@ -98,7 +103,7 @@ * Sets the WireFormat instance to use. * @param WireFormat the object used to encode / decode commands. */ - virtual void setWireFormat( wireformat::WireFormat* wireFormat ) = 0; + virtual void setWireFormat( const Pointer& wireFormat ) = 0; /** * Sets the observer of asynchronous events from this transport. Modified: activemq/activemq-cpp/trunk/src/main/activemq/transport/TransportFactory.h URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/transport/TransportFactory.h?rev=745701&r1=745700&r2=745701&view=diff ============================================================================== --- activemq/activemq-cpp/trunk/src/main/activemq/transport/TransportFactory.h (original) +++ activemq/activemq-cpp/trunk/src/main/activemq/transport/TransportFactory.h Thu Feb 19 01:06:07 2009 @@ -23,6 +23,7 @@ #include #include #include +#include namespace activemq{ namespace transport{ @@ -49,7 +50,7 @@ * @param location - URI location to connect to plus any properties to assign. * @throws ActiveMQexception if an error occurs */ - virtual Transport* create( const decaf::net::URI& location ) + virtual Pointer create( const decaf::net::URI& location ) throw ( exceptions::ActiveMQException ) = 0; /** @@ -58,7 +59,7 @@ * @param location - URI location to connect to plus any properties to assign. * @throws ActiveMQexception if an error occurs */ - virtual Transport* createComposite( const decaf::net::URI& location ) + virtual Pointer createComposite( const decaf::net::URI& location ) throw ( exceptions::ActiveMQException ) = 0; }; Modified: activemq/activemq-cpp/trunk/src/main/activemq/transport/TransportFilter.cpp URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/transport/TransportFilter.cpp?rev=745701&r1=745700&r2=745701&view=diff ============================================================================== --- activemq/activemq-cpp/trunk/src/main/activemq/transport/TransportFilter.cpp (original) +++ activemq/activemq-cpp/trunk/src/main/activemq/transport/TransportFilter.cpp Thu Feb 19 01:06:07 2009 @@ -20,31 +20,17 @@ using namespace activemq; using namespace activemq::transport; +using namespace decaf::lang; //////////////////////////////////////////////////////////////////////////////// -TransportFilter::TransportFilter( Transport* next, const bool own ) { - - this->next = next; - this->own = own; - this->listener = NULL; +TransportFilter::TransportFilter( const Pointer& next ) : + next( next ), listener( NULL ) { // Observe the nested transport for events. next->setTransportListener( this ); } //////////////////////////////////////////////////////////////////////////////// -TransportFilter::~TransportFilter() { - - try{ - if( own ){ - delete next; - next = NULL; - } - } - AMQ_CATCHALL_NOTHROW() -} - -//////////////////////////////////////////////////////////////////////////////// void TransportFilter::onTransportException( Transport* source AMQCPP_UNUSED, const decaf::lang::Exception& ex ) { Modified: activemq/activemq-cpp/trunk/src/main/activemq/transport/TransportFilter.h URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/transport/TransportFilter.h?rev=745701&r1=745700&r2=745701&view=diff ============================================================================== --- activemq/activemq-cpp/trunk/src/main/activemq/transport/TransportFilter.h (original) +++ activemq/activemq-cpp/trunk/src/main/activemq/transport/TransportFilter.h Thu Feb 19 01:06:07 2009 @@ -23,11 +23,16 @@ #include #include #include +#include #include namespace activemq{ namespace transport{ + using decaf::lang::Pointer; + using activemq::commands::Command; + using activemq::commands::Response; + /** * A filter on the transport layer. Transport * filters implement the Transport interface and @@ -40,13 +45,7 @@ /** * The transport that this filter wraps around. */ - Transport* next; - - /** - * Flag to indicate whether this object controls - * the lifetime of the next transport object. - */ - bool own; + Pointer next; /** * Listener of this transport. @@ -56,7 +55,7 @@ protected: /** - * Notify the excpetion listener + * Notify the exception listener * @param ex - the exception to send to listeners */ void fire( const decaf::lang::Exception& ex ){ @@ -72,12 +71,10 @@ * Notify the command listener. * @param command - the command to send to the listener */ - void fire( commands::Command* command ){ + void fire( const Pointer& command ){ try{ if( listener != NULL ){ listener->onCommand( command ); - } else { - delete command; } }catch( ... ){} } @@ -89,15 +86,15 @@ * @param next - the next Transport in the chain * @param own - true if this filter owns the next and should delete it */ - TransportFilter( Transport* next, const bool own = true ); + TransportFilter( const Pointer& next ); - virtual ~TransportFilter(); + virtual ~TransportFilter() {} /** * Event handler for the receipt of a command. * @param command - the received command object. */ - virtual void onCommand( commands::Command* command ){ + virtual void onCommand( const Pointer& command ){ fire( command ); } @@ -128,7 +125,7 @@ * @throws UnsupportedOperationException if this method is not implemented * by this transport. */ - virtual void oneway( commands::Command* command ) + virtual void oneway( const Pointer& command ) throw( CommandIOException, decaf::lang::exceptions::UnsupportedOperationException ){ next->oneway( command ); @@ -140,7 +137,7 @@ * @throws CommandIOException * @throws UnsupportedOperationException. */ - virtual commands::Response* request( commands::Command* command ) + virtual Pointer request( const Pointer& command ) throw( CommandIOException, decaf::lang::exceptions::UnsupportedOperationException ){ return next->request( command ); @@ -153,7 +150,7 @@ * @throws CommandIOException * @throws UnsupportedOperationException. */ - virtual commands::Response* request( commands::Command* command, unsigned int timeout ) + virtual Pointer request( const Pointer& command, unsigned int timeout ) throw( CommandIOException, decaf::lang::exceptions::UnsupportedOperationException ){ return next->request( command, timeout ); @@ -171,7 +168,7 @@ * Sets the WireFormat instance to use. * @param WireFormat the object used to encode / decode commands. */ - virtual void setWireFormat( wireformat::WireFormat* wireFormat ) { + virtual void setWireFormat( const Pointer& wireFormat ) { next->setWireFormat( wireFormat ); } @@ -203,6 +200,7 @@ */ virtual void close() throw( cms::CMSException ){ next->close(); + next.reset( NULL ); } /** Modified: activemq/activemq-cpp/trunk/src/main/activemq/transport/TransportListener.h URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/transport/TransportListener.h?rev=745701&r1=745700&r2=745701&view=diff ============================================================================== --- activemq/activemq-cpp/trunk/src/main/activemq/transport/TransportListener.h (original) +++ activemq/activemq-cpp/trunk/src/main/activemq/transport/TransportListener.h Thu Feb 19 01:06:07 2009 @@ -19,17 +19,19 @@ #define _ACTIVEMQ_TRANSPORT_TRANSPORTLISTENER_H_ #include +#include #include +#include namespace activemq{ -namespace commands{ - class Command; -} namespace transport{ // Forward declarations. class Transport; + using decaf::lang::Pointer; + using activemq::commands::Command; + /** * A listener of asynchronous exceptions from a command transport object. */ @@ -46,7 +48,7 @@ * * @param command the received command object. */ - virtual void onCommand( commands::Command* command ) = 0; + virtual void onCommand( const Pointer& command ) = 0; /** * Event handler for an exception from a command transport. Modified: activemq/activemq-cpp/trunk/src/main/activemq/transport/correlator/FutureResponse.h URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/transport/correlator/FutureResponse.h?rev=745701&r1=745700&r2=745701&view=diff ============================================================================== --- activemq/activemq-cpp/trunk/src/main/activemq/transport/correlator/FutureResponse.h (original) +++ activemq/activemq-cpp/trunk/src/main/activemq/transport/correlator/FutureResponse.h Thu Feb 19 01:06:07 2009 @@ -19,6 +19,7 @@ #define _ACTIVEMQ_TRANSPORT_CORRELATOR_FUTURERESPONSE_H_ #include +#include #include #include #include @@ -29,6 +30,9 @@ namespace transport{ namespace correlator{ + using decaf::lang::Pointer; + using activemq::commands::Response; + /** * A container that holds a response object. Callers of the getResponse * method will block until a response has been receive unless they call @@ -38,13 +42,11 @@ private: mutable decaf::util::concurrent::CountDownLatch responseLatch; - commands::Response* response; + Pointer response; public: - FutureResponse() : responseLatch( 1 ) { - response = NULL; - } + FutureResponse() : responseLatch( 1 ) {} virtual ~FutureResponse(){} @@ -52,11 +54,11 @@ * Getters for the response property. Infinite Wait. * @return the response object for the request */ - virtual const commands::Response* getResponse() const{ + virtual const Pointer& getResponse() const { this->responseLatch.await(); return response; } - virtual commands::Response* getResponse(){ + virtual Pointer& getResponse() { this->responseLatch.await(); return response; } @@ -66,11 +68,11 @@ * @param timeout - time to wait in milliseconds * @return the response object for the request */ - virtual const commands::Response* getResponse( unsigned timeout ) const{ + virtual const Pointer& getResponse( unsigned int timeout ) const { this->responseLatch.await( timeout ); return response; } - virtual commands::Response* getResponse( unsigned int timeout ){ + virtual Pointer& getResponse( unsigned int timeout ) { this->responseLatch.await( timeout ); return response; } @@ -79,7 +81,7 @@ * Setter for the response property. * @param response the response object for the request. */ - virtual void setResponse( commands::Response* response ){ + virtual void setResponse( const Pointer& response ) { this->response = response; this->responseLatch.countDown(); } Modified: activemq/activemq-cpp/trunk/src/main/activemq/transport/correlator/ResponseCorrelator.cpp URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/transport/correlator/ResponseCorrelator.cpp?rev=745701&r1=745700&r2=745701&view=diff ============================================================================== --- activemq/activemq-cpp/trunk/src/main/activemq/transport/correlator/ResponseCorrelator.cpp (original) +++ activemq/activemq-cpp/trunk/src/main/activemq/transport/correlator/ResponseCorrelator.cpp Thu Feb 19 01:06:07 2009 @@ -16,21 +16,22 @@ */ #include "ResponseCorrelator.h" +#include +using namespace std; using namespace activemq; using namespace activemq::transport; using namespace activemq::transport::correlator; using namespace activemq::exceptions; +using namespace decaf; using namespace decaf::lang; using namespace decaf::lang::exceptions; //////////////////////////////////////////////////////////////////////////////// -ResponseCorrelator::ResponseCorrelator( Transport* next, bool own ) - : TransportFilter( next, own ) { +ResponseCorrelator::ResponseCorrelator( const Pointer& next ) + : TransportFilter( next ), closed( true ) { nextCommandId.set(1); - // Start in the closed state. - closed = true; } //////////////////////////////////////////////////////////////////////////////// @@ -38,13 +39,10 @@ // Close the transport and destroy it. close(); - - // Don't do anything with the future responses - - // they should be cleaned up by each requester. } //////////////////////////////////////////////////////////////////////////////// -void ResponseCorrelator::oneway( commands::Command* command ) +void ResponseCorrelator::oneway( const Pointer& command ) throw( CommandIOException, decaf::lang::exceptions::UnsupportedOperationException ) { try{ @@ -66,25 +64,25 @@ } //////////////////////////////////////////////////////////////////////////////// -commands::Response* ResponseCorrelator::request( commands::Command* command ) +Pointer ResponseCorrelator::request( const Pointer& command ) throw( CommandIOException, decaf::lang::exceptions::UnsupportedOperationException ) { try{ + command->setCommandId( nextCommandId.getAndIncrement() ); command->setResponseRequired( true ); // Add a future response object to the map indexed by this // command id. - // TODO = This might not get deleted if an exception is thrown. - FutureResponse* futureResponse = new FutureResponse(); + Pointer futureResponse( new FutureResponse() ); synchronized( &mapMutex ){ - requestMap[command->getCommandId()] = futureResponse; + requestMap.insert( make_pair( command->getCommandId(), futureResponse ) ); } // Wait to be notified of the response via the futureResponse // object. - commands::Response* response = NULL; + Pointer response; // Send the request. next->oneway( command ); @@ -98,13 +96,6 @@ // We've done our waiting - get this thing out // of the map. requestMap.erase( command->getCommandId() ); - - // Destroy the futureResponse. It is safe to - // do this now because the other thread only - // accesses the futureResponse within a lock on - // the map. - delete futureResponse; - futureResponse = NULL; } if( response == NULL ){ @@ -124,7 +115,7 @@ } //////////////////////////////////////////////////////////////////////////////// -commands::Response* ResponseCorrelator::request( commands::Command* command, unsigned int timeout ) +Pointer ResponseCorrelator::request( const Pointer& command, unsigned int timeout ) throw( CommandIOException, decaf::lang::exceptions::UnsupportedOperationException ) { try{ @@ -133,16 +124,15 @@ // Add a future response object to the map indexed by this // command id. - // TODO = This might not get deleted if an exception is thrown. - FutureResponse* futureResponse = new FutureResponse(); + Pointer futureResponse( new FutureResponse() ); synchronized( &mapMutex ){ - requestMap[command->getCommandId()] = futureResponse; + requestMap.insert( make_pair( command->getCommandId(), futureResponse ) ); } // Wait to be notified of the response via the futureResponse // object. - commands::Response* response = NULL; + Pointer response; // Send the request. next->oneway( command ); @@ -156,13 +146,6 @@ // We've done our waiting - get this thing out // of the map. requestMap.erase( command->getCommandId() ); - - // Destroy the futureResponse. It is safe to - // do this now because the other thread only - // accesses the futureResponse within a lock on - // the map. - delete futureResponse; - futureResponse = NULL; } if( response == NULL ){ @@ -182,7 +165,7 @@ } //////////////////////////////////////////////////////////////////////////////// -void ResponseCorrelator::onCommand( commands::Command* command ) { +void ResponseCorrelator::onCommand( const Pointer& command ) { // Let's see if the incoming command is a response. if( !command->isResponse() ){ @@ -192,25 +175,26 @@ return; } - commands::Response* response = dynamic_cast( command ); + Pointer response = + command.dynamicCast< Response, Pointer::CounterType >(); // It is a response - let's correlate ... synchronized( &mapMutex ){ // Look the future request up based on the correlation id. - std::map::iterator iter = + std::map< unsigned int, Pointer >::iterator iter = requestMap.find( response->getCorrelationId() ); if( iter == requestMap.end() ){ // This is not terrible - just log it. - //printf("ResponseCorrelator::onCommand() - received unknown response for request: %d\n", - // response->getCorrelationId() ); + //printf( "ResponseCorrelator::onCommand() - " + // "received unknown response for request: %d\n", + // response->getCorrelationId() ); return; } // Get the future response (if it's in the map, it's not NULL). - FutureResponse* futureResponse = NULL; - futureResponse = iter->second; + Pointer futureResponse = iter->second; // Set the response property in the future response. futureResponse->setResponse( response ); @@ -257,9 +241,9 @@ // Wake-up any outstanding requests. synchronized( &mapMutex ){ - std::map::iterator iter = requestMap.begin(); + std::map >::iterator iter = requestMap.begin(); for( ; iter != requestMap.end(); ++iter ){ - iter->second->setResponse( NULL ); + iter->second->setResponse( Pointer() ); } } @@ -275,16 +259,15 @@ } //////////////////////////////////////////////////////////////////////////////// -void ResponseCorrelator::onTransportException( - Transport* source AMQCPP_UNUSED, - const decaf::lang::Exception& ex ) { +void ResponseCorrelator::onTransportException( Transport* source AMQCPP_UNUSED, + const decaf::lang::Exception& ex ) { // Trigger each outstanding request to complete so that we don't hang // forever waiting for one that has been sent without timeout. synchronized( &mapMutex ){ - std::map::iterator iter = requestMap.begin(); + std::map< unsigned int, Pointer >::iterator iter = requestMap.begin(); for( ; iter != requestMap.end(); ++iter ){ - iter->second->setResponse( NULL ); + iter->second->setResponse( Pointer() ); } } Modified: activemq/activemq-cpp/trunk/src/main/activemq/transport/correlator/ResponseCorrelator.h URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/transport/correlator/ResponseCorrelator.h?rev=745701&r1=745700&r2=745701&view=diff ============================================================================== --- activemq/activemq-cpp/trunk/src/main/activemq/transport/correlator/ResponseCorrelator.h (original) +++ activemq/activemq-cpp/trunk/src/main/activemq/transport/correlator/ResponseCorrelator.h Thu Feb 19 01:06:07 2009 @@ -33,6 +33,10 @@ namespace transport{ namespace correlator{ + using decaf::lang::Pointer; + using activemq::commands::Command; + using activemq::commands::Response; + /** * This type of transport filter is responsible for correlating * asynchronous responses with requests. Non-response messages @@ -50,7 +54,7 @@ /** * Map of request ids to future response objects. */ - std::map requestMap; + std::map > requestMap; /** * Sync object for accessing the request map. @@ -67,9 +71,8 @@ /** * Constructor. * @param next the next transport in the chain - * @param own indicates if this transport owns the next */ - ResponseCorrelator( Transport* next, bool own = true ); + ResponseCorrelator( const Pointer& next ); virtual ~ResponseCorrelator(); @@ -82,7 +85,7 @@ * @throws UnsupportedOperationException if this method is not implemented * by this transport. */ - virtual void oneway( commands::Command* command ) + virtual void oneway( const Pointer& command ) throw( CommandIOException, decaf::lang::exceptions::UnsupportedOperationException ); @@ -92,7 +95,7 @@ * @return the response from the server. * @throws CommandIOException if an error occurs with the request. */ - virtual commands::Response* request( commands::Command* command ) + virtual Pointer request( const Pointer& command ) throw( CommandIOException, decaf::lang::exceptions::UnsupportedOperationException ); @@ -103,7 +106,7 @@ * @return the response from the server. * @throws CommandIOException if an error occurs with the request. */ - virtual commands::Response* request( commands::Command* command, unsigned int timeout ) + virtual Pointer request( const Pointer& command, unsigned int timeout ) throw( CommandIOException, decaf::lang::exceptions::UnsupportedOperationException ); @@ -115,7 +118,7 @@ * the command listener. * @param command the received from the nested transport. */ - virtual void onCommand( commands::Command* command ); + virtual void onCommand( const Pointer& command ); /** * Starts this transport object and creates the thread for Modified: activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/BackupTransport.cpp URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/BackupTransport.cpp?rev=745701&r1=745700&r2=745701&view=diff ============================================================================== --- activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/BackupTransport.cpp (original) +++ activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/BackupTransport.cpp Thu Feb 19 01:06:07 2009 @@ -24,10 +24,9 @@ using namespace activemq::transport::failover; //////////////////////////////////////////////////////////////////////////////// -BackupTransport::BackupTransport( FailoverTransport* failover ) : failover( failover ) { +BackupTransport::BackupTransport( FailoverTransport* failover ) : + failover( failover ), closed( true ) { - this->transport = NULL; - this->closed = false; } //////////////////////////////////////////////////////////////////////////////// Modified: activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/BackupTransport.h URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/BackupTransport.h?rev=745701&r1=745700&r2=745701&view=diff ============================================================================== --- activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/BackupTransport.h (original) +++ activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/BackupTransport.h Thu Feb 19 01:06:07 2009 @@ -23,12 +23,15 @@ #include #include #include +#include #include namespace activemq { namespace transport { namespace failover { + using decaf::lang::Pointer; + class FailoverTransport; class AMQCPP_API BackupTransport : public DefaultTransportListener { @@ -38,7 +41,7 @@ FailoverTransport* failover; // The Transport this one is managing. - Transport* transport; + Pointer transport; // The URI of this Backup decaf::net::URI uri; @@ -71,7 +74,7 @@ * Gets the currently held transport * @returns pointer to the held transport or NULL if not set. */ - Transport* getTransport() { + const Pointer& getTransport() { return transport; } @@ -82,7 +85,7 @@ * @param transport * The transport to hold. */ - void setTransport( Transport* transport ) { + void setTransport( const Pointer& transport ) { this->transport = transport; if( this->transport != NULL ) { Modified: activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/FailoverTransport.cpp URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/FailoverTransport.cpp?rev=745701&r1=745700&r2=745701&view=diff ============================================================================== --- activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/FailoverTransport.cpp (original) +++ activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/FailoverTransport.cpp Thu Feb 19 01:06:07 2009 @@ -62,14 +62,14 @@ } //////////////////////////////////////////////////////////////////////////////// -void FailoverTransport::oneway( commands::Command* command ) +void FailoverTransport::oneway( const Pointer& command ) throw( CommandIOException, decaf::lang::exceptions::UnsupportedOperationException ) { } //////////////////////////////////////////////////////////////////////////////// -commands::Response* FailoverTransport::request( commands::Command* command AMQCPP_UNUSED ) +Pointer FailoverTransport::request( const Pointer& command AMQCPP_UNUSED ) throw( CommandIOException, decaf::lang::exceptions::UnsupportedOperationException ) { @@ -78,7 +78,7 @@ } //////////////////////////////////////////////////////////////////////////////// -commands::Response* FailoverTransport::request( commands::Command* command AMQCPP_UNUSED, +Pointer FailoverTransport::request( const Pointer& command AMQCPP_UNUSED, unsigned int timeout AMQCPP_UNUSED ) throw( CommandIOException, decaf::lang::exceptions::UnsupportedOperationException ) { Modified: activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/FailoverTransport.h URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/FailoverTransport.h?rev=745701&r1=745700&r2=745701&view=diff ============================================================================== --- activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/FailoverTransport.h (original) +++ activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/FailoverTransport.h Thu Feb 19 01:06:07 2009 @@ -21,17 +21,24 @@ #include #include +#include #include +#include #include +#include #include +#include +#include #include namespace activemq { namespace transport { namespace failover { - class BackupTransport; + using decaf::lang::Pointer; + using activemq::commands::Command; + using activemq::commands::Response; class AMQCPP_API FailoverTransport : public CompositeTransport { private: @@ -40,6 +47,11 @@ bool connected; bool started; + decaf::net::URI connectedTransportURI; + decaf::net::URI failedConnectTransportURI; + decaf::util::concurrent::atomic::AtomicReference connectedTransport; + //TaskRunner reconnectTask; + decaf::util::StlSet uris; long long timeout; @@ -58,9 +70,19 @@ bool trackMessages; int maxCacheSize; - //List backups=new CopyOnWriteArrayList(); + decaf::util::StlSet< Pointer > backups; decaf::lang::Exception connectionFailure; + state::ConnectionStateTracker stateTracker; + decaf::util::concurrent::Mutex reconnectMutex; + decaf::util::concurrent::Mutex backupMutex; + decaf::util::concurrent::Mutex sleepMutex; + decaf::util::concurrent::Mutex listenerMutex; + decaf::util::StlMap > requestMap; + + Pointer disposedListener; + Pointer myTansportListener; + public: FailoverTransport(); @@ -125,7 +147,7 @@ * @throws UnsupportedOperationException if this method is not implemented * by this transport. */ - virtual void oneway( commands::Command* command ) + virtual void oneway( const Pointer& command ) throw( CommandIOException, decaf::lang::exceptions::UnsupportedOperationException ); @@ -138,7 +160,7 @@ * @throws UnsupportedOperationException if this method is not implemented * by this transport. */ - virtual commands::Response* request( commands::Command* command ) + virtual Pointer request( const Pointer& command ) throw( CommandIOException, decaf::lang::exceptions::UnsupportedOperationException ); @@ -152,7 +174,7 @@ * @throws UnsupportedOperationException if this method is not implemented * by this transport. */ - virtual commands::Response* request( commands::Command* command, unsigned int timeout ) + virtual Pointer request( const Pointer& command, unsigned int timeout ) throw( CommandIOException, decaf::lang::exceptions::UnsupportedOperationException ); @@ -160,7 +182,7 @@ * Sets the WireFormat instance to use. * @param WireFormat the object used to encode / decode commands. */ - virtual void setWireFormat( wireformat::WireFormat* wireFormat ) {} + virtual void setWireFormat( const Pointer& wireFormat ) {} /** * Sets the observer of asynchronous events from this transport. Modified: activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/FailoverTransportFactory.cpp URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/FailoverTransportFactory.cpp?rev=745701&r1=745700&r2=745701&view=diff ============================================================================== --- activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/FailoverTransportFactory.cpp (original) +++ activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/FailoverTransportFactory.cpp Thu Feb 19 01:06:07 2009 @@ -33,14 +33,15 @@ using namespace decaf::lang; //////////////////////////////////////////////////////////////////////////////// -Transport* FailoverTransportFactory::doCreateComposite( const decaf::net::URI& location, - wireformat::WireFormat* wireFormat, - const decaf::util::Properties& properties ) - throw ( exceptions::ActiveMQException ) { +Pointer FailoverTransportFactory::doCreateComposite( + const decaf::net::URI& location, + const Pointer& wireFormat, + const decaf::util::Properties& properties ) + throw ( exceptions::ActiveMQException ) { try { - std::auto_ptr transport( new FailoverTransport() ); + Pointer transport( new FailoverTransport() ); transport->setInitialReconnectDelay( Long::parseLong( properties.getProperty( "initialReconnectDelay", "10" ) ) ); @@ -63,7 +64,7 @@ transport->setMaxCacheSize( Integer::parseInt( properties.getProperty( "maxCacheSize", "131072" ) ) ); - return transport.release(); + return transport; } AMQ_CATCH_RETHROW( ActiveMQException ) AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException ) Modified: activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/FailoverTransportFactory.h URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/FailoverTransportFactory.h?rev=745701&r1=745700&r2=745701&view=diff ============================================================================== --- activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/FailoverTransportFactory.h (original) +++ activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/FailoverTransportFactory.h Thu Feb 19 01:06:07 2009 @@ -31,6 +31,8 @@ namespace transport { namespace failover { + using decaf::lang::Pointer; + /** * Creates an instance of a FailoverTransport. * @@ -45,14 +47,17 @@ /** * Creates a slimed down Transport instance which can be used in composite * transport instances. + * * @param location - URI location to connect to. * @param wireformat - the assigned WireFormat for the new Transport. * @param properties - Properties to apply to the transport. + * + * @return Pointer to a new FailoverTransport instance. * @throws ActiveMQexception if an error occurs */ - virtual Transport* doCreateComposite( const decaf::net::URI& location, - wireformat::WireFormat* wireFormat, - const decaf::util::Properties& properties ) + virtual Pointer doCreateComposite( const decaf::net::URI& location, + const Pointer& wireFormat, + const decaf::util::Properties& properties ) throw ( exceptions::ActiveMQException ); }; Modified: activemq/activemq-cpp/trunk/src/main/activemq/transport/logging/LoggingTransport.cpp URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/transport/logging/LoggingTransport.cpp?rev=745701&r1=745700&r2=745701&view=diff ============================================================================== --- activemq/activemq-cpp/trunk/src/main/activemq/transport/logging/LoggingTransport.cpp (original) +++ activemq/activemq-cpp/trunk/src/main/activemq/transport/logging/LoggingTransport.cpp Thu Feb 19 01:06:07 2009 @@ -26,15 +26,15 @@ using namespace decaf::lang::exceptions; //////////////////////////////////////////////////////////////////////////////// -LOGDECAF_INITIALIZE( logger, LoggingTransport, "activemq.transport.filters.LoggingTransport") +LOGDECAF_INITIALIZE( logger, LoggingTransport, "activemq.transport.logging.LoggingTransport") //////////////////////////////////////////////////////////////////////////////// -LoggingTransport::LoggingTransport( Transport* next, bool own ) - : TransportFilter( next, own ) +LoggingTransport::LoggingTransport( const Pointer& next ) + : TransportFilter( next ) {} //////////////////////////////////////////////////////////////////////////////// -void LoggingTransport::onCommand( commands::Command* command ) { +void LoggingTransport::onCommand( const Pointer& command ) { ostringstream ostream; ostream << "*** BEGIN RECEIVED ASYNCHRONOUS COMMAND ***" << endl; @@ -48,7 +48,7 @@ } //////////////////////////////////////////////////////////////////////////////// -void LoggingTransport::oneway( commands::Command* command ) +void LoggingTransport::oneway( const Pointer& command ) throw(CommandIOException, decaf::lang::exceptions::UnsupportedOperationException) { try { @@ -70,13 +70,13 @@ } //////////////////////////////////////////////////////////////////////////////// -commands::Response* LoggingTransport::request( commands::Command* command ) +Pointer LoggingTransport::request( const Pointer& command ) throw(CommandIOException, decaf::lang::exceptions::UnsupportedOperationException) { try { // Delegate to the base class. - commands::Response* response = TransportFilter::request( command ); + Pointer response = TransportFilter::request( command ); ostringstream ostream; ostream << "*** SENDING REQUEST COMMAND ***" << endl; @@ -95,13 +95,13 @@ } //////////////////////////////////////////////////////////////////////////////// -commands::Response* LoggingTransport::request( commands::Command* command, unsigned int timeout ) +Pointer LoggingTransport::request( const Pointer& command, unsigned int timeout ) throw(CommandIOException, decaf::lang::exceptions::UnsupportedOperationException) { try { // Delegate to the base class. - commands::Response* response = TransportFilter::request( command, timeout ); + Pointer response = TransportFilter::request( command, timeout ); ostringstream ostream; ostream << "*** SENDING REQUEST COMMAND: Timeout = " << timeout << " ***" << endl; Modified: activemq/activemq-cpp/trunk/src/main/activemq/transport/logging/LoggingTransport.h URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/transport/logging/LoggingTransport.h?rev=745701&r1=745700&r2=745701&view=diff ============================================================================== --- activemq/activemq-cpp/trunk/src/main/activemq/transport/logging/LoggingTransport.h (original) +++ activemq/activemq-cpp/trunk/src/main/activemq/transport/logging/LoggingTransport.h Thu Feb 19 01:06:07 2009 @@ -21,11 +21,14 @@ #include #include #include +#include namespace activemq{ namespace transport{ namespace logging{ + using decaf::lang::Pointer; + /** * A transport filter that logs commands as they are sent/received. */ @@ -39,9 +42,8 @@ /** * Constructor. * @param next - the next Transport in the chain - * @param own - true if this filter owns the next and should delete it */ - LoggingTransport( Transport* next, bool own = true ); + LoggingTransport( const Pointer& next ); virtual ~LoggingTransport() {} @@ -49,7 +51,7 @@ * Event handler for the receipt of a command. * @param command - the received command object. */ - virtual void onCommand( commands::Command* command ); + virtual void onCommand( const Pointer& command ); /** * Sends a one-way command. Does not wait for any response from the @@ -60,18 +62,18 @@ * @throws UnsupportedOperationException if this method is not implemented * by this transport. */ - virtual void oneway( commands::Command* command ) + virtual void oneway( const Pointer& command ) throw( CommandIOException, - decaf::lang::exceptions::UnsupportedOperationException); + decaf::lang::exceptions::UnsupportedOperationException ); /** * Not supported by this class - throws an exception. * @param command the command that is sent as a request * @throws UnsupportedOperationException. */ - virtual commands::Response* request( commands::Command* command ) + virtual Pointer request( const Pointer& command ) throw( CommandIOException, - decaf::lang::exceptions::UnsupportedOperationException); + decaf::lang::exceptions::UnsupportedOperationException ); /** * Not supported by this class - throws an exception. @@ -79,9 +81,9 @@ * @param timeout the time to wait for a response. * @throws UnsupportedOperationException. */ - virtual commands::Response* request( commands::Command* command, unsigned int timeout ) + virtual Pointer request( const Pointer& command, unsigned int timeout ) throw( CommandIOException, - decaf::lang::exceptions::UnsupportedOperationException); + decaf::lang::exceptions::UnsupportedOperationException ); }; Modified: activemq/activemq-cpp/trunk/src/main/activemq/transport/mock/MockTransport.cpp URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/transport/mock/MockTransport.cpp?rev=745701&r1=745700&r2=745701&view=diff ============================================================================== --- activemq/activemq-cpp/trunk/src/main/activemq/transport/mock/MockTransport.cpp (original) +++ activemq/activemq-cpp/trunk/src/main/activemq/transport/mock/MockTransport.cpp Thu Feb 19 01:06:07 2009 @@ -30,14 +30,13 @@ MockTransport* MockTransport::instance = NULL; //////////////////////////////////////////////////////////////////////////////// -MockTransport::MockTransport( WireFormat* wireFormat, ResponseBuilder* responseBuilder, bool own ){ +MockTransport::MockTransport( const Pointer& wireFormat, + const Pointer& responseBuilder ){ this->wireFormat = wireFormat; - this->responseBuilder = NULL; this->outgoingListener = NULL; this->listener = NULL; this->responseBuilder = responseBuilder; - this->own = own; this->nextCommandId.set( 0 ); this->instance = this; @@ -47,20 +46,7 @@ } //////////////////////////////////////////////////////////////////////////////// -MockTransport::~MockTransport(){ - try{ - - if( this->own ){ - delete this->responseBuilder; - } - - delete this->wireFormat; - } - AMQ_CATCHALL_NOTHROW() -} - -//////////////////////////////////////////////////////////////////////////////// -void MockTransport::oneway( commands::Command* command ) +void MockTransport::oneway( const Pointer& command ) throw( CommandIOException, decaf::lang::exceptions::UnsupportedOperationException) { @@ -83,7 +69,7 @@ } //////////////////////////////////////////////////////////////////////////////// -commands::Response* MockTransport::request( commands::Command* command ) +Pointer MockTransport::request( const Pointer& command ) throw( CommandIOException, decaf::lang::exceptions::UnsupportedOperationException) { @@ -113,8 +99,8 @@ } //////////////////////////////////////////////////////////////////////////////// -commands::Response* MockTransport::request( commands::Command* command, - unsigned int timeout AMQCPP_UNUSED ) +Pointer MockTransport::request( const Pointer& command, + unsigned int timeout AMQCPP_UNUSED ) throw( CommandIOException, decaf::lang::exceptions::UnsupportedOperationException) { Modified: activemq/activemq-cpp/trunk/src/main/activemq/transport/mock/MockTransport.h URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/transport/mock/MockTransport.h?rev=745701&r1=745700&r2=745701&view=diff ============================================================================== --- activemq/activemq-cpp/trunk/src/main/activemq/transport/mock/MockTransport.h (original) +++ activemq/activemq-cpp/trunk/src/main/activemq/transport/mock/MockTransport.h Thu Feb 19 01:06:07 2009 @@ -41,6 +41,10 @@ namespace transport{ namespace mock{ + using decaf::lang::Pointer; + using activemq::commands::Command; + using activemq::commands::Response; + /** * The MockTransport defines a base level Transport class that is intended to * be used in place of an a regular protocol Transport suck as TCP. This @@ -71,7 +75,7 @@ * @param command - The command to build a response for * @return A Response object pointer, or NULL if no response. */ - virtual commands::Response* buildResponse( const commands::Command* command ) = 0; + virtual Pointer buildResponse( const Pointer& command ) = 0; /** * When called the ResponseBuilder must construct all the @@ -81,8 +85,8 @@ * @param queue - Queue of Command sent back from the broker. */ virtual void buildIncomingCommands( - const commands::Command* cmd, - decaf::util::StlQueue& queue ) = 0; + const Pointer& command, + decaf::util::StlQueue< Pointer >& queue ) = 0; }; @@ -100,16 +104,15 @@ private: MockTransport* transport; - ResponseBuilder* responseBuilder; + Pointer responseBuilder; bool done; decaf::util::concurrent::CountDownLatch startedLatch; - decaf::util::StlQueue inboundQueue; + decaf::util::StlQueue< Pointer > inboundQueue; public: InternalCommandListener() : startedLatch(1) { transport = NULL; - responseBuilder = NULL; done = false; this->start(); @@ -123,20 +126,18 @@ } this->join(); - while( !inboundQueue.empty() ) { - delete inboundQueue.pop(); - } + inboundQueue.clear(); } void setTransport( MockTransport* transport ){ this->transport = transport; } - void setResponseBuilder( ResponseBuilder* responseBuilder ) { + void setResponseBuilder( const Pointer& responseBuilder ) { this->responseBuilder = responseBuilder; } - virtual void onCommand( commands::Command* command ) { + virtual void onCommand( const Pointer& command ) { synchronized( &inboundQueue ) { // Create a response now before the caller has a @@ -177,20 +178,20 @@ private: - ResponseBuilder* responseBuilder; + Pointer responseBuilder; + Pointer wireFormat; TransportListener* outgoingListener; TransportListener* listener; - wireformat::WireFormat* wireFormat; decaf::util::concurrent::atomic::AtomicInteger nextCommandId; - bool own; InternalCommandListener internalListener; static MockTransport* instance; public: - MockTransport( wireformat::WireFormat* wireFormat, ResponseBuilder* responseBuilder, bool own = true ); + MockTransport( const Pointer& wireFormat, + const Pointer& responseBuilder ); - virtual ~MockTransport(); + virtual ~MockTransport() {} static MockTransport* getInstance() { return instance; @@ -202,19 +203,19 @@ * would have been sent Asynchronously be the Broker. * @param responseBuilder - The ResponseBuilder to use from now on. */ - void setResponseBuilder( ResponseBuilder* responseBuilder ){ + void setResponseBuilder( const Pointer& responseBuilder ){ this->responseBuilder = responseBuilder; } - virtual void oneway( commands::Command* command ) + virtual void oneway( const Pointer& command ) throw(CommandIOException, decaf::lang::exceptions::UnsupportedOperationException); - virtual commands::Response* request( commands::Command* command ) + virtual Pointer request( const Pointer& command ) throw( CommandIOException, decaf::lang::exceptions::UnsupportedOperationException); - virtual commands::Response* request( commands::Command* command, unsigned int timeout ) + virtual Pointer request( const Pointer&, unsigned int timeout ) throw( CommandIOException, decaf::lang::exceptions::UnsupportedOperationException); @@ -232,7 +233,7 @@ * Sets the WireFormat instance to use. * @param WireFormat the object used to encode / decode commands. */ - virtual void setWireFormat( wireformat::WireFormat* wireFormat AMQCPP_UNUSED ) {} + virtual void setWireFormat( const Pointer& wireFormat AMQCPP_UNUSED ) {} virtual void setTransportListener( TransportListener* listener ) { this->listener = listener; @@ -243,7 +244,7 @@ * CommandListener if there is one. * @param command - Command to send to the Listener. */ - virtual void fireCommand( commands::Command* command ){ + virtual void fireCommand( const Pointer& command ){ if( listener != NULL ){ listener->onCommand( command ); } Modified: activemq/activemq-cpp/trunk/src/main/activemq/transport/mock/MockTransportFactory.cpp URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/transport/mock/MockTransportFactory.cpp?rev=745701&r1=745700&r2=745701&view=diff ============================================================================== --- activemq/activemq-cpp/trunk/src/main/activemq/transport/mock/MockTransportFactory.cpp (original) +++ activemq/activemq-cpp/trunk/src/main/activemq/transport/mock/MockTransportFactory.cpp Thu Feb 19 01:06:07 2009 @@ -30,25 +30,26 @@ using namespace decaf::lang; //////////////////////////////////////////////////////////////////////////////// -Transport* MockTransportFactory::doCreateComposite( const decaf::net::URI& location AMQCPP_UNUSED, - wireformat::WireFormat* wireFormat, - const decaf::util::Properties& properties ) - throw ( exceptions::ActiveMQException ) { +Pointer MockTransportFactory::doCreateComposite( + const decaf::net::URI& location AMQCPP_UNUSED, + const Pointer& wireFormat, + const decaf::util::Properties& properties ) + throw ( exceptions::ActiveMQException ) { try { std::string wireFormatName = properties.getProperty( "wireFormat", "stomp" ); - MockTransport::ResponseBuilder* builder = NULL; + Pointer builder; if( wireFormatName == "stomp" ) { -// builder = new wireformat::stomp::StompResponseBuilder(); +// builder.reset( new wireformat::stomp::StompResponseBuilder() ); } else if( wireFormatName == "openwire" ) { - builder = new wireformat::openwire::OpenWireResponseBuilder(); + builder.reset( new wireformat::openwire::OpenWireResponseBuilder() ); } - return new MockTransport( wireFormat, builder, true ); + return Pointer( new MockTransport( wireFormat, builder ) ); } AMQ_CATCH_RETHROW( ActiveMQException ) AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException ) Modified: activemq/activemq-cpp/trunk/src/main/activemq/transport/mock/MockTransportFactory.h URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/transport/mock/MockTransportFactory.h?rev=745701&r1=745700&r2=745701&view=diff ============================================================================== --- activemq/activemq-cpp/trunk/src/main/activemq/transport/mock/MockTransportFactory.h (original) +++ activemq/activemq-cpp/trunk/src/main/activemq/transport/mock/MockTransportFactory.h Thu Feb 19 01:06:07 2009 @@ -25,6 +25,8 @@ namespace transport{ namespace mock{ + using decaf::lang::Pointer; + /** * Manufactures MockTransports, which are objects that * read from input streams and write to output streams. @@ -39,14 +41,17 @@ /** * Creates a slimed down Transport instance which can be used in composite * transport instances. + * * @param location - URI location to connect to. * @param wireformat - the assigned WireFormat for the new Transport. * @param properties - Properties to apply to the transport. + * + * @return Pointer to a new Transport instance. * @throws ActiveMQexception if an error occurs */ - virtual Transport* doCreateComposite( const decaf::net::URI& location, - wireformat::WireFormat* wireFormat, - const decaf::util::Properties& properties ) + virtual Pointer doCreateComposite( const decaf::net::URI& location, + const Pointer& wireFormat, + const decaf::util::Properties& properties ) throw ( exceptions::ActiveMQException ); }; Modified: activemq/activemq-cpp/trunk/src/main/activemq/transport/tcp/TcpTransport.cpp URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/transport/tcp/TcpTransport.cpp?rev=745701&r1=745700&r2=745701&view=diff ============================================================================== --- activemq/activemq-cpp/trunk/src/main/activemq/transport/tcp/TcpTransport.cpp (original) +++ activemq/activemq-cpp/trunk/src/main/activemq/transport/tcp/TcpTransport.cpp Thu Feb 19 01:06:07 2009 @@ -36,16 +36,16 @@ //////////////////////////////////////////////////////////////////////////////// TcpTransport::TcpTransport( const decaf::net::URI& uri, const decaf::util::Properties& properties, - Transport* next, const bool own ) -: TransportFilter( next, own ) { + const Pointer& next ) +: TransportFilter( next ) { this->initialize( uri, properties ); } //////////////////////////////////////////////////////////////////////////////// TcpTransport::TcpTransport( const decaf::util::Properties& properties, - Transport* next, const bool own ) -: TransportFilter( next, own ) { + const Pointer& next ) +: TransportFilter( next ) { if( !properties.hasProperty( "transport.uri" ) ) { throw ActiveMQException( @@ -101,7 +101,7 @@ // Cast it to an IO transport so we can wire up the socket // input and output streams. - IOTransport* ioTransport = dynamic_cast( next ); + IOTransport* ioTransport = dynamic_cast( next.get() ); if( ioTransport == NULL ){ throw ActiveMQException( __FILE__, __LINE__, Modified: activemq/activemq-cpp/trunk/src/main/activemq/transport/tcp/TcpTransport.h URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/transport/tcp/TcpTransport.h?rev=745701&r1=745700&r2=745701&view=diff ============================================================================== --- activemq/activemq-cpp/trunk/src/main/activemq/transport/tcp/TcpTransport.h (original) +++ activemq/activemq-cpp/trunk/src/main/activemq/transport/tcp/TcpTransport.h Thu Feb 19 01:06:07 2009 @@ -18,13 +18,14 @@ #ifndef _ACTIVEMQ_TRANSPORT_TCP_TCPTRANSPORT_H_ #define _ACTIVEMQ_TRANSPORT_TCP_TCPTRANSPORT_H_ +#include +#include #include #include #include #include #include -#include -#include +#include #include #include #include @@ -35,6 +36,8 @@ namespace transport{ namespace tcp{ + using decaf::lang::Pointer; + /** * Implements a TCP/IP based transport filter, this transport * is meant to wrap an instance of an IOTransport. The lower @@ -73,8 +76,7 @@ * @param own indicates if this transport owns the next. */ TcpTransport( const decaf::util::Properties& properties, - Transport* next, - const bool own = true ); + const Pointer& next ); /** * Constructor @@ -85,8 +87,7 @@ */ TcpTransport( const decaf::net::URI& uri, const decaf::util::Properties& properties, - Transport* next, - const bool own = true ); + const Pointer& next ); virtual ~TcpTransport(); Modified: activemq/activemq-cpp/trunk/src/main/activemq/transport/tcp/TcpTransportFactory.cpp URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/transport/tcp/TcpTransportFactory.cpp?rev=745701&r1=745700&r2=745701&view=diff ============================================================================== --- activemq/activemq-cpp/trunk/src/main/activemq/transport/tcp/TcpTransportFactory.cpp (original) +++ activemq/activemq-cpp/trunk/src/main/activemq/transport/tcp/TcpTransportFactory.cpp Thu Feb 19 01:06:07 2009 @@ -24,16 +24,19 @@ using namespace activemq::transport; using namespace activemq::transport::tcp; using namespace activemq::exceptions; +using namespace decaf; using namespace decaf::lang; //////////////////////////////////////////////////////////////////////////////// -Transport* TcpTransportFactory::doCreateComposite( const decaf::net::URI& location, - wireformat::WireFormat* wireFormat, - const decaf::util::Properties& properties ) +Pointer TcpTransportFactory::doCreateComposite( const decaf::net::URI& location, + const Pointer& wireFormat, + const decaf::util::Properties& properties ) throw ( exceptions::ActiveMQException ) { try { - return new TcpTransport( location, properties, new IOTransport( wireFormat ) ); + return Pointer( + new TcpTransport( location, properties, + Pointer( new IOTransport( wireFormat ) ) ) ); } AMQ_CATCH_RETHROW( ActiveMQException ) AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )