Return-Path: Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: (qmail 69948 invoked from network); 26 Jan 2009 16:48:53 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 26 Jan 2009 16:48:53 -0000 Received: (qmail 50640 invoked by uid 500); 26 Jan 2009 16:48:53 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 50611 invoked by uid 500); 26 Jan 2009 16:48:53 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 50602 invoked by uid 99); 26 Jan 2009 16:48:53 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 26 Jan 2009 08:48:53 -0800 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 26 Jan 2009 16:48:51 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 8874D2388896; Mon, 26 Jan 2009 16:48:31 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r737745 - in /activemq/activemq-cpp/trunk/src: main/ main/activemq/core/ main/activemq/transport/ main/activemq/transport/correlator/ main/activemq/transport/failover/ main/activemq/transport/mock/ main/activemq/transport/tcp/ main/activemq... Date: Mon, 26 Jan 2009 16:48:30 -0000 To: commits@activemq.apache.org From: tabish@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20090126164831.8874D2388896@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: tabish Date: Mon Jan 26 16:48:29 2009 New Revision: 737745 URL: http://svn.apache.org/viewvc?rev=737745&view=rev Log: https://issues.apache.org/activemq/browse/AMQCPP-100 Simplify the Transport Listener interfaces into one interface, and then update all the dependencies, add a DefaultTransportListener that does nothing for the events. Add more bits to Failover Transport. Added: activemq/activemq-cpp/trunk/src/main/activemq/transport/DefaultTransportListener.h (with props) activemq/activemq-cpp/trunk/src/main/activemq/transport/TransportListener.h - copied, changed from r737520, activemq/activemq-cpp/trunk/src/main/activemq/transport/TransportExceptionListener.h Removed: activemq/activemq-cpp/trunk/src/main/activemq/transport/CommandListener.h activemq/activemq-cpp/trunk/src/main/activemq/transport/TransportExceptionListener.h Modified: activemq/activemq-cpp/trunk/src/main/Makefile.am activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConnection.cpp activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConnectionSupport.h activemq/activemq-cpp/trunk/src/main/activemq/transport/IOTransport.cpp activemq/activemq-cpp/trunk/src/main/activemq/transport/IOTransport.h activemq/activemq-cpp/trunk/src/main/activemq/transport/Transport.h activemq/activemq-cpp/trunk/src/main/activemq/transport/TransportFilter.cpp activemq/activemq-cpp/trunk/src/main/activemq/transport/TransportFilter.h activemq/activemq-cpp/trunk/src/main/activemq/transport/correlator/ResponseCorrelator.cpp activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/BackupTransport.cpp activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/BackupTransport.h activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/FailoverTransport.cpp activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/FailoverTransport.h activemq/activemq-cpp/trunk/src/main/activemq/transport/mock/MockTransport.cpp activemq/activemq-cpp/trunk/src/main/activemq/transport/mock/MockTransport.h activemq/activemq-cpp/trunk/src/main/activemq/transport/mock/MockTransportFactory.cpp activemq/activemq-cpp/trunk/src/main/activemq/transport/tcp/TcpTransport.cpp activemq/activemq-cpp/trunk/src/main/activemq/transport/tcp/TcpTransport.h activemq/activemq-cpp/trunk/src/main/activemq/wireformat/openwire/OpenWireFormatNegotiator.cpp activemq/activemq-cpp/trunk/src/test/activemq/core/ActiveMQConnectionTest.cpp activemq/activemq-cpp/trunk/src/test/activemq/core/ActiveMQConnectionTest.h activemq/activemq-cpp/trunk/src/test/activemq/transport/IOTransportTest.cpp activemq/activemq-cpp/trunk/src/test/activemq/transport/correlator/ResponseCorrelatorTest.cpp activemq/activemq-cpp/trunk/src/test/activemq/transport/correlator/ResponseCorrelatorTest.h Modified: activemq/activemq-cpp/trunk/src/main/Makefile.am URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/Makefile.am?rev=737745&r1=737744&r2=737745&view=diff ============================================================================== --- activemq/activemq-cpp/trunk/src/main/Makefile.am (original) +++ activemq/activemq-cpp/trunk/src/main/Makefile.am Mon Jan 26 16:48:29 2009 @@ -43,6 +43,9 @@ activemq/core/ActiveMQProducer.cpp \ activemq/core/ActiveMQConnectionFactory.cpp \ activemq/core/ActiveMQSessionExecutor.cpp \ + activemq/transport/failover/BackupTransport.cpp \ + activemq/transport/failover/FailoverTransport.cpp \ + activemq/transport/failover/FailoverTransportFactory.cpp \ activemq/transport/TransportFilter.cpp \ activemq/transport/TransportRegistry.cpp \ activemq/transport/AbstractTransportFactory.cpp \ @@ -178,17 +181,20 @@ activemq/exceptions/ActiveMQException.h \ activemq/exceptions/BrokerException.h \ activemq/exceptions/ExceptionDefines.h \ - activemq/transport/Transport.h \ - activemq/transport/TransportFilter.h \ - activemq/transport/TransportFactory.h \ activemq/transport/AbstractTransportFactory.h \ - activemq/transport/TransportExceptionListener.h \ - activemq/transport/TransportRegistry.h \ - activemq/transport/Response.h \ activemq/transport/Command.h \ - activemq/transport/CommandListener.h \ activemq/transport/CommandIOException.h \ + activemq/transport/DefaultTransportListener.h \ activemq/transport/IOTransport.h \ + activemq/transport/Response.h \ + activemq/transport/Transport.h \ + activemq/transport/TransportListener.h \ + activemq/transport/TransportFilter.h \ + activemq/transport/TransportFactory.h \ + activemq/transport/TransportRegistry.h \ + activemq/transport/failover/BackupTransport.h \ + activemq/transport/failover/FailoverTransport.h \ + activemq/transport/failover/FailoverTransportFactory.h \ activemq/transport/mock/MockTransport.h \ activemq/transport/mock/MockTransportFactory.h \ activemq/transport/correlator/FutureResponse.h \ Modified: activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConnection.cpp URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConnection.cpp?rev=737745&r1=737744&r2=737745&view=diff ============================================================================== --- activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConnection.cpp (original) +++ activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConnection.cpp Mon Jan 26 16:48:29 2009 @@ -69,8 +69,7 @@ this->connectionMetaData.reset( new ActiveMQConnectionMetaData() ); // Register for messages and exceptions from the connector. - transport->setCommandListener( this ); - transport->setTransportExceptionListener( this ); + transport->setTransportListener( this ); // Now Start the Transport transport->start(); Modified: activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConnectionSupport.h URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConnectionSupport.h?rev=737745&r1=737744&r2=737745&view=diff ============================================================================== --- activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConnectionSupport.h (original) +++ activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConnectionSupport.h Mon Jan 26 16:48:29 2009 @@ -23,8 +23,7 @@ #include #include #include -#include -#include +#include #include #include @@ -36,8 +35,7 @@ namespace core { class AMQCPP_API ActiveMQConnectionSupport : - public transport::CommandListener, - public transport::TransportExceptionListener + public transport::TransportListener { private: Added: activemq/activemq-cpp/trunk/src/main/activemq/transport/DefaultTransportListener.h URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/transport/DefaultTransportListener.h?rev=737745&view=auto ============================================================================== --- activemq/activemq-cpp/trunk/src/main/activemq/transport/DefaultTransportListener.h (added) +++ activemq/activemq-cpp/trunk/src/main/activemq/transport/DefaultTransportListener.h Mon Jan 26 16:48:29 2009 @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef _ACTIVEMQ_TRANSPORT_DEFAULTTRANSPORTLISTENER_H_ +#define _ACTIVEMQ_TRANSPORT_DEFAULTTRANSPORTLISTENER_H_ + +#include +#include + +namespace activemq { +namespace transport { + + class AMQCPP_API DefaultTransportListener : public TransportListener { + public: + + virtual ~DefaultTransportListener() {} + + /** + * Event handler for the receipt of a command. The transport passes + * off all received commands to its listeners, the listener then owns + * the Object. If there is no registered listener the Transport deletes + * the command upon receipt. + * + * @param command the received command object. + */ + virtual void onCommand( Command* command AMQCPP_UNUSED ) {} + + /** + * Event handler for an exception from a command transport. + * + * @param source The source of the exception + * @param ex The exception. + */ + virtual void onTransportException( Transport* source AMQCPP_UNUSED, + const decaf::lang::Exception& ex AMQCPP_UNUSED ) {} + + /** + * The transport has suffered an interruption from which it hopes to recover + */ + virtual void transportInterrupted() {} + + /** + * The transport has resumed after an interruption + */ + virtual void transportResumed() {} + + }; + +}} + +#endif /*_ACTIVEMQ_TRANSPORT_DEFAULTTRANSPORTLISTENER_H_*/ Propchange: activemq/activemq-cpp/trunk/src/main/activemq/transport/DefaultTransportListener.h ------------------------------------------------------------------------------ svn:eol-style = native Modified: activemq/activemq-cpp/trunk/src/main/activemq/transport/IOTransport.cpp URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/transport/IOTransport.cpp?rev=737745&r1=737744&r2=737745&view=diff ============================================================================== --- activemq/activemq-cpp/trunk/src/main/activemq/transport/IOTransport.cpp (original) +++ activemq/activemq-cpp/trunk/src/main/activemq/transport/IOTransport.cpp Mon Jan 26 16:48:29 2009 @@ -38,7 +38,6 @@ IOTransport::IOTransport(){ this->listener = NULL; - this->exceptionListener = NULL; this->inputStream = NULL; this->outputStream = NULL; this->closed = false; @@ -50,7 +49,6 @@ IOTransport::IOTransport( WireFormat* wireFormat ) { this->listener = NULL; - this->exceptionListener = NULL; this->inputStream = NULL; this->outputStream = NULL; this->closed = false; @@ -69,10 +67,10 @@ //////////////////////////////////////////////////////////////////////////////// void IOTransport::fire( decaf::lang::Exception& ex ){ - if( this->exceptionListener != NULL && !this->closed ){ + if( this->listener != NULL && !this->closed ){ try{ - this->exceptionListener->onTransportException( this, ex ); + this->listener->onTransportException( this, ex ); }catch( ... ){} } } Modified: activemq/activemq-cpp/trunk/src/main/activemq/transport/IOTransport.h URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/transport/IOTransport.h?rev=737745&r1=737744&r2=737745&view=diff ============================================================================== --- activemq/activemq-cpp/trunk/src/main/activemq/transport/IOTransport.h (original) +++ activemq/activemq-cpp/trunk/src/main/activemq/transport/IOTransport.h Mon Jan 26 16:48:29 2009 @@ -20,8 +20,7 @@ #include #include -#include -#include +#include #include #include @@ -57,19 +56,14 @@ private: /** - * Listener to incoming commands. - */ - CommandListener* listener; - - /** * WireFormat instance to use to Encode / Decode. */ wireformat::WireFormat* wireFormat; /** - * Listener of exceptions from this transport. + * Listener of this transport. */ - TransportExceptionListener* exceptionListener; + TransportListener* listener; /** * The input stream for incoming commands. @@ -153,14 +147,6 @@ throw( CommandIOException, decaf::lang::exceptions::UnsupportedOperationException ); /** - * Assigns the command listener for non-response commands. - * @param listener the listener. - */ - virtual void setCommandListener( CommandListener* listener ){ - this->listener = listener; - } - - /** * Sets the WireFormat instance to use. * @param WireFormat the object used to encode / decode commands. */ @@ -169,11 +155,11 @@ } /** - * Sets the observer of asynchronous exceptions from this transport. - * @param listener the listener of transport exceptions. + * Sets the observer of asynchronous events from this transport. + * @param listener the listener of transport events. */ - virtual void setTransportExceptionListener( TransportExceptionListener* listener ){ - this->exceptionListener = listener; + virtual void setTransportListener( TransportListener* listener ){ + this->listener = listener; } /** @@ -232,6 +218,35 @@ return NULL; } + + /** + * Is this Transport fault tolerant, meaning that it will reconnect to + * a broker on disconnect. + * + * @returns true if the Transport is fault tolerant. + */ + virtual bool isFaultTolerant() const { + return false; + } + + /** + * Is the Transport Connected to its Broker. + * + * @returns true if a connection has been made. + */ + virtual bool isConnected() const { + return !this->closed; + } + + /** + * Has the Transport been shutdown and no longer usable. + * + * @returns true if the Transport + */ + virtual bool isClosed() const { + return this->closed; + } + }; }} Modified: activemq/activemq-cpp/trunk/src/main/activemq/transport/Transport.h URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/transport/Transport.h?rev=737745&r1=737744&r2=737745&view=diff ============================================================================== --- activemq/activemq-cpp/trunk/src/main/activemq/transport/Transport.h (original) +++ activemq/activemq-cpp/trunk/src/main/activemq/transport/Transport.h Mon Jan 26 16:48:29 2009 @@ -36,8 +36,7 @@ namespace transport{ // Forward declarations. - class CommandListener; - class TransportExceptionListener; + class TransportListener; /** * Interface for a transport layer for command objects. Callers can @@ -96,23 +95,16 @@ decaf::lang::exceptions::UnsupportedOperationException ) = 0; /** - * Assigns the command listener for non-response commands. - * @param listener the listener. - */ - virtual void setCommandListener( CommandListener* listener ) = 0; - - /** * Sets the WireFormat instance to use. * @param WireFormat the object used to encode / decode commands. */ virtual void setWireFormat( wireformat::WireFormat* wireFormat ) = 0; /** - * Sets the observer of asynchronous exceptions from this transport. - * @param listener the listener of transport exceptions. + * Sets the observer of asynchronous events from this transport. + * @param listener the listener of transport events. */ - virtual void setTransportExceptionListener( - TransportExceptionListener* listener ) = 0; + virtual void setTransportListener( TransportListener* listener ) = 0; /** * Narrows down a Chain of Transports to a specific Transport to allow a @@ -125,6 +117,28 @@ */ virtual Transport* narrow( const std::type_info& typeId ) = 0; + /** + * Is this Transport fault tolerant, meaning that it will reconnect to + * a broker on disconnect. + * + * @returns true if the Transport is fault tolerant. + */ + virtual bool isFaultTolerant() const = 0; + + /** + * Is the Transport Connected to its Broker. + * + * @returns true if a connection has been made. + */ + virtual bool isConnected() const = 0; + + /** + * Has the Transport been shutdown and no longer usable. + * + * @returns true if the Transport + */ + virtual bool isClosed() const = 0; + }; }} Modified: activemq/activemq-cpp/trunk/src/main/activemq/transport/TransportFilter.cpp URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/transport/TransportFilter.cpp?rev=737745&r1=737744&r2=737745&view=diff ============================================================================== --- activemq/activemq-cpp/trunk/src/main/activemq/transport/TransportFilter.cpp (original) +++ activemq/activemq-cpp/trunk/src/main/activemq/transport/TransportFilter.cpp Mon Jan 26 16:48:29 2009 @@ -26,13 +26,10 @@ this->next = next; this->own = own; - - commandlistener = NULL; - exceptionListener = NULL; + this->listener = NULL; // Observe the nested transport for events. - next->setCommandListener( this ); - next->setTransportExceptionListener( this ); + next->setTransportListener( this ); } //////////////////////////////////////////////////////////////////////////////// Modified: activemq/activemq-cpp/trunk/src/main/activemq/transport/TransportFilter.h URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/transport/TransportFilter.h?rev=737745&r1=737744&r2=737745&view=diff ============================================================================== --- activemq/activemq-cpp/trunk/src/main/activemq/transport/TransportFilter.h (original) +++ activemq/activemq-cpp/trunk/src/main/activemq/transport/TransportFilter.h Mon Jan 26 16:48:29 2009 @@ -21,9 +21,8 @@ #include #include #include -#include #include -#include +#include #include namespace activemq{ @@ -36,8 +35,7 @@ */ class AMQCPP_API TransportFilter : public Transport, - public CommandListener, - public TransportExceptionListener { + public TransportListener { protected: @@ -53,14 +51,9 @@ bool own; /** - * Listener to incoming commands. + * Listener of this transport. */ - CommandListener* commandlistener; - - /** - * Listener of exceptions from this transport. - */ - TransportExceptionListener* exceptionListener; + TransportListener* listener; protected: @@ -70,9 +63,9 @@ */ void fire( const decaf::lang::Exception& ex ){ - if( exceptionListener != NULL ){ + if( listener != NULL ){ try{ - exceptionListener->onTransportException( this, ex ); + listener->onTransportException( this, ex ); }catch( ... ){} } } @@ -83,8 +76,10 @@ */ void fire( Command* command ){ try{ - if( commandlistener != NULL ){ - commandlistener->onCommand( command ); + if( listener != NULL ){ + listener->onCommand( command ); + } else { + delete command; } }catch( ... ){} } @@ -167,19 +162,11 @@ } /** - * Assigns the command listener for non-response commands. - * @param listener the listener. - */ - virtual void setCommandListener( CommandListener* listener ){ - this->commandlistener = listener; - } - - /** * Sets the observer of asynchronous exceptions from this transport. * @param listener the listener of transport exceptions. */ - virtual void setTransportExceptionListener( TransportExceptionListener* listener ){ - this->exceptionListener = listener; + virtual void setTransportListener( TransportListener* listener ){ + this->listener = listener; } /** @@ -201,12 +188,7 @@ */ virtual void start() throw( cms::CMSException ) { - if( commandlistener == NULL ){ - throw exceptions::ActiveMQException( __FILE__, __LINE__, - "commandListener is invalid" ); - } - - if( exceptionListener == NULL ){ + if( listener == NULL ){ throw exceptions::ActiveMQException( __FILE__, __LINE__, "exceptionListener is invalid" ); } @@ -244,6 +226,34 @@ return NULL; } + /** + * Is this Transport fault tolerant, meaning that it will reconnect to + * a broker on disconnect. + * + * @returns true if the Transport is fault tolerant. + */ + virtual bool isFaultTolerant() const { + return next->isFaultTolerant(); + } + + /** + * Is the Transport Connected to its Broker. + * + * @returns true if a connection has been made. + */ + virtual bool isConnected() const { + return next->isConnected(); + } + + /** + * Has the Transport been shutdown and no longer usable. + * + * @returns true if the Transport + */ + virtual bool isClosed() const { + return next->isClosed(); + } + }; }} Copied: activemq/activemq-cpp/trunk/src/main/activemq/transport/TransportListener.h (from r737520, activemq/activemq-cpp/trunk/src/main/activemq/transport/TransportExceptionListener.h) URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/transport/TransportListener.h?p2=activemq/activemq-cpp/trunk/src/main/activemq/transport/TransportListener.h&p1=activemq/activemq-cpp/trunk/src/main/activemq/transport/TransportExceptionListener.h&r1=737520&r2=737745&rev=737745&view=diff ============================================================================== --- activemq/activemq-cpp/trunk/src/main/activemq/transport/TransportExceptionListener.h (original) +++ activemq/activemq-cpp/trunk/src/main/activemq/transport/TransportListener.h Mon Jan 26 16:48:29 2009 @@ -15,28 +15,40 @@ * limitations under the License. */ -#ifndef _ACTIVEMQ_TRANSPORT_TRANSPORTEXCEPTIONLISTENER_H_ -#define _ACTIVEMQ_TRANSPORT_TRANSPORTEXCEPTIONLISTENER_H_ +#ifndef _ACTIVEMQ_TRANSPORT_TRANSPORTLISTENER_H_ +#define _ACTIVEMQ_TRANSPORT_TRANSPORTLISTENER_H_ -#include #include +#include namespace activemq{ namespace transport{ // Forward declarations. class Transport; + class Command; /** * A listener of asynchronous exceptions from a command transport object. */ - class AMQCPP_API TransportExceptionListener{ + class AMQCPP_API TransportListener{ public: - virtual ~TransportExceptionListener() {} + virtual ~TransportListener() {} + + /** + * Event handler for the receipt of a command. The transport passes + * off all received commands to its listeners, the listener then owns + * the Object. If there is no registered listener the Transport deletes + * the command upon receipt. + * + * @param command the received command object. + */ + virtual void onCommand( Command* command ) = 0; /** * Event handler for an exception from a command transport. + * * @param source The source of the exception * @param ex The exception. */ @@ -57,4 +69,4 @@ }} -#endif /*_ACTIVEMQ_TRANSPORT_TRANSPORTEXCEPTIONLISTENER_H_*/ +#endif /*_ACTIVEMQ_TRANSPORT_TRANSPORTLISTENER_H_*/ Modified: activemq/activemq-cpp/trunk/src/main/activemq/transport/correlator/ResponseCorrelator.cpp URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/transport/correlator/ResponseCorrelator.cpp?rev=737745&r1=737744&r2=737745&view=diff ============================================================================== --- activemq/activemq-cpp/trunk/src/main/activemq/transport/correlator/ResponseCorrelator.cpp (original) +++ activemq/activemq-cpp/trunk/src/main/activemq/transport/correlator/ResponseCorrelator.cpp Mon Jan 26 16:48:29 2009 @@ -229,12 +229,7 @@ return; } - if( commandlistener == NULL ){ - throw exceptions::ActiveMQException( __FILE__, __LINE__, - "commandListener is invalid" ); - } - - if( exceptionListener == NULL ){ + if( listener == NULL ){ throw exceptions::ActiveMQException( __FILE__, __LINE__, "exceptionListener is invalid" ); } Modified: activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/BackupTransport.cpp URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/BackupTransport.cpp?rev=737745&r1=737744&r2=737745&view=diff ============================================================================== --- activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/BackupTransport.cpp (original) +++ activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/BackupTransport.cpp Mon Jan 26 16:48:29 2009 @@ -17,16 +17,17 @@ #include "BackupTransport.h" +#include + using namespace activemq; using namespace activemq::transport; using namespace activemq::transport::failover; //////////////////////////////////////////////////////////////////////////////// -BackupTransport::BackupTransport( FailoverTransport* failover ) { +BackupTransport::BackupTransport( FailoverTransport* failover ) : failover( failover ) { - this->failover = failover; this->transport = NULL; - this->disposed = false; + this->closed = false; } //////////////////////////////////////////////////////////////////////////////// @@ -37,13 +38,9 @@ void BackupTransport::onTransportException( transport::Transport* source, const decaf::lang::Exception& ex ) { - this->disposed = true; -} - -//////////////////////////////////////////////////////////////////////////////// -void BackupTransport::transportInterrupted() { -} + this->closed = true; -//////////////////////////////////////////////////////////////////////////////// -void BackupTransport::transportResumed() { + if( this->failover != NULL ) { + this->failover->reconnect(); + } } Modified: activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/BackupTransport.h URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/BackupTransport.h?rev=737745&r1=737744&r2=737745&view=diff ============================================================================== --- activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/BackupTransport.h (original) +++ activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/BackupTransport.h Mon Jan 26 16:48:29 2009 @@ -21,8 +21,9 @@ #include #include -#include +#include #include +#include namespace activemq { namespace transport { @@ -30,7 +31,7 @@ class FailoverTransport; - class AMQCPP_API BackupTransport : public TransportExceptionListener { + class AMQCPP_API BackupTransport : public DefaultTransportListener { private: // The parent Failover Transport @@ -40,28 +41,29 @@ Transport* transport; // The URI of this Backup - URI uri; + decaf::net::URI uri; // Indicates that the contained transport is not valid any longer. - bool disposed; + bool closed; public: BackupTransport( FailoverTransport* failover ); + virtual ~BackupTransport(); /** * Gets the URI assigned to this Backup * @return the assigned URI */ - URI getURI() const { + decaf::net::URI getURI() const { return this->uri; } /** * Sets the URI assigned to this Transport. */ - void setURI( const URI& uri ) { + void setURI( const decaf::net::URI& uri ) { this->uri = uri; } @@ -84,7 +86,7 @@ this->transport = transport; if( this->transport != NULL ) { - this->transport->setTransportExceptionListener( this ); + this->transport->setTransportListener( this ); } } @@ -97,14 +99,13 @@ const decaf::lang::Exception& ex ); /** - * The transport has suffered an interruption from which it hopes to recover - */ - virtual void transportInterrupted(); - - /** - * The transport has resumed after an interruption + * Has the Transport been shutdown and no longer usable. + * + * @returns true if the Transport */ - virtual void transportResumed(); + bool isClosed() const { + return this->closed; + } }; Modified: activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/FailoverTransport.cpp URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/FailoverTransport.cpp?rev=737745&r1=737744&r2=737745&view=diff ============================================================================== --- activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/FailoverTransport.cpp (original) +++ activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/FailoverTransport.cpp Mon Jan 26 16:48:29 2009 @@ -22,7 +22,24 @@ using namespace activemq::transport::failover; //////////////////////////////////////////////////////////////////////////////// -FailoverTransport::FailoverTransport() { +FailoverTransport::FailoverTransport( const decaf::net::URI& location, + wireformat::WireFormat* wireformat, + const decaf::util::Properties& properties ) { + + this->initialReconnectDelay = 10; + this->maxReconnectDelay = 1000 * 30; + this->backOffMultiplier = 2; + this->useExponentialBackOff = true; + this->randomize = true; + this->initialized = false; + this->maxReconnectAttempts = 0; + this->connectFailures = 0; + this->reconnectDelay = this->initialReconnectDelay; + this->firstConnection = true; + this->backup = false; + this->backupPoolSize = 1; + this->trackMessages = false; + this->maxCacheSize = 128 * 1024; } //////////////////////////////////////////////////////////////////////////////// Modified: activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/FailoverTransport.h URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/FailoverTransport.h?rev=737745&r1=737744&r2=737745&view=diff ============================================================================== --- activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/FailoverTransport.h (original) +++ activemq/activemq-cpp/trunk/src/main/activemq/transport/failover/FailoverTransport.h Mon Jan 26 16:48:29 2009 @@ -22,6 +22,10 @@ #include +#include +#include +#include + namespace activemq { namespace transport { namespace failover { @@ -29,11 +33,185 @@ class BackupTransport; class AMQCPP_API FailoverTransport : public CompositeTransport { + private: + + bool closed; + bool connected; + bool started; + + //decaf::util::Set uris; + + long long initialReconnectDelay; + long long maxReconnectDelay; + long long backOffMultiplier; + bool useExponentialBackOff; + bool randomize; + bool initialized; + int maxReconnectAttempts; + int connectFailures; + long long reconnectDelay; + decaf::lang::Exception connectionFailure; + bool firstConnection; + bool backup; + //List backups=new CopyOnWriteArrayList(); + int backupPoolSize; + bool trackMessages; + int maxCacheSize; + public: - FailoverTransport(); + FailoverTransport( const decaf::net::URI& location, + wireformat::WireFormat* wireformat, + const decaf::util::Properties& properties ); + virtual ~FailoverTransport(); + /** + * Indicates that the Transport needs to reconnect to another URI in its + * list. + */ + void reconnect(); + + public: // CompositeTransport methods + + /** + * Add a URI to the list of URI's that will represent the set of Transports + * that this Transport is a composite of. + * + * @param uri + * The new URI to add to the set this composite maintains. + */ + virtual void addURI( const decaf::net::URI& uri ); + + /** + * Remove a URI from the set of URI's that represents the set of Transports + * that this Transport is composed of, removing a URI for which the composite + * has created a connected Transport should result in that Transport being + * disposed of. + * + * @param uri + * The new URI to remove to the set this composite maintains. + */ + virtual void removeURI( const decaf::net::URI& uri ); + + public: // Transport Members + + /** + * Starts this transport object and creates the thread for + * polling on the input stream for commands. If this object + * has been closed, throws an exception. Before calling start, + * the caller must set the IO streams and the reader and writer + * objects. + * @throws CMSException if an error occurs or if this transport + * has already been closed. + */ + virtual void start() throw( cms::CMSException ); + + /** + * Stops the polling thread and closes the streams. This can + * be called explicitly, but is also called in the destructor. Once + * this object has been closed, it cannot be restarted. + * @throws CMSException if errors occur. + */ + virtual void close() throw( cms::CMSException ); + + /** + * Sends a one-way command. Does not wait for any response from the + * broker. + * @param command the command to be sent. + * @throws CommandIOException if an exception occurs during writing of + * the command. + * @throws UnsupportedOperationException if this method is not implemented + * by this transport. + */ + virtual void oneway( Command* command ) + throw( CommandIOException, + decaf::lang::exceptions::UnsupportedOperationException ); + + /** + * Sends the given command to the broker and then waits for the response. + * @param command the command to be sent. + * @return the response from the broker. + * @throws CommandIOException if an exception occurs during the read of the + * command. + * @throws UnsupportedOperationException if this method is not implemented + * by this transport. + */ + virtual Response* request( Command* command ) + throw( CommandIOException, + decaf::lang::exceptions::UnsupportedOperationException ); + + /** + * Sends the given command to the broker and then waits for the response. + * @param command - The command to be sent. + * @param timeout - The time to wait for this response. + * @return the response from the broker. + * @throws CommandIOException if an exception occurs during the read of the + * command. + * @throws UnsupportedOperationException if this method is not implemented + * by this transport. + */ + virtual Response* request( Command* command, unsigned int timeout ) + throw( CommandIOException, + decaf::lang::exceptions::UnsupportedOperationException ); + + /** + * Sets the WireFormat instance to use. + * @param WireFormat the object used to encode / decode commands. + */ + virtual void setWireFormat( wireformat::WireFormat* wireFormat ); + + /** + * Sets the observer of asynchronous events from this transport. + * @param listener the listener of transport events. + */ + virtual void setTransportListener( TransportListener* listener ); + + /** + * Is this Transport fault tolerant, meaning that it will reconnect to + * a broker on disconnect. + * + * @returns true if the Transport is fault tolerant. + */ + virtual bool isFaultTolerant() const { + return true; + } + + /** + * Is the Transport Connected to its Broker. + * + * @returns true if a connection has been made. + */ + virtual bool isConnected() const { + return this->connected; + } + + /** + * Has the Transport been shutdown and no longer usable. + * + * @returns true if the Transport + */ + virtual bool isClosed() const { + return this->closed; + } + + /** + * Narrows down a Chain of Transports to a specific Transport to allow a + * higher level transport to skip intermediate Transports in certain + * circumstances. + * + * @param typeId - The type_info of the Object we are searching for. + * + * @return the requested Object. or NULL if its not in this chain. + */ + virtual Transport* narrow( const std::type_info& typeId ) { + if( typeid( *this ) == typeId ) { + return this; + } + + return NULL; + } + }; }}} Modified: activemq/activemq-cpp/trunk/src/main/activemq/transport/mock/MockTransport.cpp URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/transport/mock/MockTransport.cpp?rev=737745&r1=737744&r2=737745&view=diff ============================================================================== --- activemq/activemq-cpp/trunk/src/main/activemq/transport/mock/MockTransport.cpp (original) +++ activemq/activemq-cpp/trunk/src/main/activemq/transport/mock/MockTransport.cpp Mon Jan 26 16:48:29 2009 @@ -33,9 +33,8 @@ bool own ){ this->responseBuilder = NULL; - this->commandListener = NULL; - this->outgoingCommandListener = NULL; - this->exceptionListener = NULL; + this->outgoingListener = NULL; + this->listener = NULL; this->responseBuilder = responseBuilder; this->own = own; this->nextCommandId.set( 0 ); @@ -68,8 +67,8 @@ internalListener.onCommand( command ); // Notify external Client of command that we "sent" - if( outgoingCommandListener != NULL ){ - outgoingCommandListener->onCommand( command ); + if( outgoingListener != NULL ){ + outgoingListener->onCommand( command ); return; } } @@ -90,8 +89,8 @@ if( responseBuilder != NULL ){ // Notify external Client of command that we "sent" - if( outgoingCommandListener != NULL ){ - outgoingCommandListener->onCommand( command ); + if( outgoingListener != NULL ){ + outgoingListener->onCommand( command ); } command->setCommandId( this->nextCommandId.incrementAndGet() ); Modified: activemq/activemq-cpp/trunk/src/main/activemq/transport/mock/MockTransport.h URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/transport/mock/MockTransport.h?rev=737745&r1=737744&r2=737745&view=diff ============================================================================== --- activemq/activemq-cpp/trunk/src/main/activemq/transport/mock/MockTransport.h (original) +++ activemq/activemq-cpp/trunk/src/main/activemq/transport/mock/MockTransport.h Mon Jan 26 16:48:29 2009 @@ -21,8 +21,8 @@ #include #include #include -#include -#include +#include +#include #include #include @@ -68,7 +68,7 @@ * Given a Command, check if it requires a response and return the * appropriate Response that the Broker would send for this Command * @param command - The command to build a response for - * @return A Reponse object pointer, or NULL if no response. + * @return A Response object pointer, or NULL if no response. */ virtual Response* buildResponse( const Command* command ) = 0; @@ -89,10 +89,10 @@ * processes all outbound commands and sends responses that are * constructed by calling the Protocol provided ResponseBuilder * and getting a set of Commands to send back into the MockTransport - * as imcoming Commands and Responses. + * as incoming Commands and Responses. */ class InternalCommandListener : - public CommandListener, + public DefaultTransportListener, public decaf::lang::Thread { private: @@ -176,9 +176,8 @@ private: ResponseBuilder* responseBuilder; - CommandListener* commandListener; - CommandListener* outgoingCommandListener; - TransportExceptionListener* exceptionListener; + TransportListener* outgoingListener; + TransportListener* listener; decaf::util::concurrent::atomic::AtomicInteger nextCommandId; bool own; InternalCommandListener internalListener; @@ -216,18 +215,14 @@ throw( CommandIOException, decaf::lang::exceptions::UnsupportedOperationException); - virtual void setCommandListener( CommandListener* listener ){ - this->commandListener = listener; - } - /** - * Sets a Command Listener that gets notified for every command that would + * Sets a Listener that gets notified for every command that would * have been sent by this transport to the Broker, this allows a client * to verify that its messages are making it to the wire. * @param listener - The CommandListener to notify for each message */ - virtual void setOutgoingCommandListener( CommandListener* listener ){ - outgoingCommandListener = listener; + virtual void setOutgoingListener( TransportListener* listener ){ + outgoingListener = listener; } /** @@ -236,10 +231,8 @@ */ virtual void setWireFormat( wireformat::WireFormat* wireFormat AMQCPP_UNUSED ) {} - virtual void setTransportExceptionListener( - TransportExceptionListener* listener ) - { - this->exceptionListener = listener; + virtual void setTransportListener( TransportListener* listener ) { + this->listener = listener; } /** @@ -248,8 +241,8 @@ * @param command - Command to send to the Listener. */ virtual void fireCommand( Command* command ){ - if( commandListener != NULL ){ - commandListener->onCommand( command ); + if( listener != NULL ){ + listener->onCommand( command ); } } @@ -259,8 +252,8 @@ * @param command - Command to send to the Listener. */ virtual void fireException( const exceptions::ActiveMQException& ex ){ - if( exceptionListener != NULL ){ - exceptionListener->onTransportException( this, ex ); + if( listener != NULL ){ + listener->onTransportException( this, ex ); } } @@ -285,6 +278,33 @@ return NULL; } + /** + * Is this Transport fault tolerant, meaning that it will reconnect to + * a broker on disconnect. + * + * @returns true if the Transport is fault tolerant. + */ + virtual bool isFaultTolerant() const { + return false; + } + + /** + * Is the Transport Connected to its Broker. + * + * @returns true if a connection has been made. + */ + virtual bool isConnected() const { + return true; + } + + /** + * Has the Transport been shutdown and no longer usable. + * + * @returns true if the Transport + */ + virtual bool isClosed() const { + return false; + } }; }}} Modified: activemq/activemq-cpp/trunk/src/main/activemq/transport/mock/MockTransportFactory.cpp URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/transport/mock/MockTransportFactory.cpp?rev=737745&r1=737744&r2=737745&view=diff ============================================================================== --- activemq/activemq-cpp/trunk/src/main/activemq/transport/mock/MockTransportFactory.cpp (original) +++ activemq/activemq-cpp/trunk/src/main/activemq/transport/mock/MockTransportFactory.cpp Mon Jan 26 16:48:29 2009 @@ -30,8 +30,8 @@ using namespace decaf::lang; //////////////////////////////////////////////////////////////////////////////// -Transport* MockTransportFactory::doCreateComposite( const decaf::net::URI& location, - wireformat::WireFormat* wireFormat, +Transport* MockTransportFactory::doCreateComposite( const decaf::net::URI& location AMQCPP_UNUSED, + wireformat::WireFormat* wireFormat AMQCPP_UNUSED, const decaf::util::Properties& properties ) throw ( exceptions::ActiveMQException ) { Modified: activemq/activemq-cpp/trunk/src/main/activemq/transport/tcp/TcpTransport.cpp URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/transport/tcp/TcpTransport.cpp?rev=737745&r1=737744&r2=737745&view=diff ============================================================================== --- activemq/activemq-cpp/trunk/src/main/activemq/transport/tcp/TcpTransport.cpp (original) +++ activemq/activemq-cpp/trunk/src/main/activemq/transport/tcp/TcpTransport.cpp Mon Jan 26 16:48:29 2009 @@ -73,6 +73,8 @@ try { + this->closed = true; + // Close the socket. if( socket.get() != NULL ) { socket->close(); Modified: activemq/activemq-cpp/trunk/src/main/activemq/transport/tcp/TcpTransport.h URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/transport/tcp/TcpTransport.h?rev=737745&r1=737744&r2=737745&view=diff ============================================================================== --- activemq/activemq-cpp/trunk/src/main/activemq/transport/tcp/TcpTransport.h (original) +++ activemq/activemq-cpp/trunk/src/main/activemq/transport/tcp/TcpTransport.h Mon Jan 26 16:48:29 2009 @@ -45,6 +45,11 @@ private: /** + * has close been called. + */ + bool closed; + + /** * Socket that this Transport Communicates with */ std::auto_ptr socket; @@ -91,6 +96,38 @@ */ virtual void close() throw( cms::CMSException ); + /** + * Is this Transport fault tolerant, meaning that it will reconnect to + * a broker on disconnect. + * + * @returns true if the Transport is fault tolerant. + */ + virtual bool isFaultTolerant() const { + return false; + } + + /** + * Is the Transport Connected to its Broker. + * + * @returns true if a connection has been made. + */ + virtual bool isConnected() const { + if( this->socket.get() != NULL ) { + return this->socket->isConnected(); + } + + return false; + } + + /** + * Has the Transport been shutdown and no longer usable. + * + * @returns true if the Transport + */ + virtual bool isClosed() const { + return this->closed; + } + private: void initialize( const decaf::net::URI& uri, Modified: activemq/activemq-cpp/trunk/src/main/activemq/wireformat/openwire/OpenWireFormatNegotiator.cpp URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/wireformat/openwire/OpenWireFormatNegotiator.cpp?rev=737745&r1=737744&r2=737745&view=diff ============================================================================== --- activemq/activemq-cpp/trunk/src/main/activemq/wireformat/openwire/OpenWireFormatNegotiator.cpp (original) +++ activemq/activemq-cpp/trunk/src/main/activemq/wireformat/openwire/OpenWireFormatNegotiator.cpp Mon Jan 26 16:48:29 2009 @@ -194,18 +194,11 @@ return; } - if( commandlistener == NULL ){ + if( listener == NULL ){ throw exceptions::ActiveMQException( __FILE__, __LINE__, "OpenWireFormatNegotiator::start - " - "commandListener is invalid" ); - } - - if( exceptionListener == NULL ){ - throw exceptions::ActiveMQException( - __FILE__, __LINE__, - "OpenWireFormatNegotiator::start - " - "exceptionListener is invalid" ); + "TransportListener is invalid" ); } if( next == NULL ){ Modified: activemq/activemq-cpp/trunk/src/test/activemq/core/ActiveMQConnectionTest.cpp URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/test/activemq/core/ActiveMQConnectionTest.cpp?rev=737745&r1=737744&r2=737745&view=diff ============================================================================== --- activemq/activemq-cpp/trunk/src/test/activemq/core/ActiveMQConnectionTest.cpp (original) +++ activemq/activemq-cpp/trunk/src/test/activemq/core/ActiveMQConnectionTest.cpp Mon Jan 26 16:48:29 2009 @@ -259,7 +259,7 @@ CPPUNIT_ASSERT( false ); } - transport->setCommandListener( &cmdListener ); + transport->setTransportListener( &cmdListener ); ActiveMQConnection connection( transport, properties ); Modified: activemq/activemq-cpp/trunk/src/test/activemq/core/ActiveMQConnectionTest.h URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/test/activemq/core/ActiveMQConnectionTest.h?rev=737745&r1=737744&r2=737745&view=diff ============================================================================== --- activemq/activemq-cpp/trunk/src/test/activemq/core/ActiveMQConnectionTest.h (original) +++ activemq/activemq-cpp/trunk/src/test/activemq/core/ActiveMQConnectionTest.h Mon Jan 26 16:48:29 2009 @@ -26,7 +26,7 @@ #include #include -#include +#include #include #include @@ -46,7 +46,7 @@ ActiveMQConnectionTest() {}; virtual ~ActiveMQConnectionTest() {} - class MyCommandListener : public transport::CommandListener{ + class MyCommandListener : public transport::DefaultTransportListener{ public: transport::Command* cmd; Modified: activemq/activemq-cpp/trunk/src/test/activemq/transport/IOTransportTest.cpp URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/test/activemq/transport/IOTransportTest.cpp?rev=737745&r1=737744&r2=737745&view=diff ============================================================================== --- activemq/activemq-cpp/trunk/src/test/activemq/transport/IOTransportTest.cpp (original) +++ activemq/activemq-cpp/trunk/src/test/activemq/transport/IOTransportTest.cpp Mon Jan 26 16:48:29 2009 @@ -18,9 +18,8 @@ #include "IOTransportTest.h" #include -#include #include -#include +#include #include #include #include @@ -59,32 +58,6 @@ }; //////////////////////////////////////////////////////////////////////////////// -class MyCommandListener : public CommandListener{ -private: - - decaf::util::concurrent::CountDownLatch latch; - -public: - - MyCommandListener() : latch(1) {} - MyCommandListener( unsigned int num ) : latch( num ) {} - - virtual ~MyCommandListener(){} - - virtual void await() { - latch.await(); - } - - std::string str; - virtual void onCommand( Command* command ){ - const MyCommand* cmd = dynamic_cast(command); - str += cmd->c; - delete command; - latch.countDown(); - } -}; - -//////////////////////////////////////////////////////////////////////////////// class MyWireFormat : public wireformat::WireFormat { public: @@ -178,16 +151,31 @@ }; //////////////////////////////////////////////////////////////////////////////// -class MyExceptionListener : public TransportExceptionListener{ +class MyTransportListener : public TransportListener{ +private: + + decaf::util::concurrent::CountDownLatch latch; + public: Transport* transport; decaf::util::concurrent::Mutex mutex; - MyExceptionListener(){ - transport = NULL; + MyTransportListener() : latch(1) { this->transport = NULL; } + MyTransportListener( unsigned int num ) : latch( num ) { this->transport = NULL; } + virtual ~MyTransportListener(){} + + virtual void await() { + latch.await(); + } + + std::string str; + virtual void onCommand( Command* command ){ + const MyCommand* cmd = dynamic_cast(command); + str += cmd->c; + delete command; + latch.countDown(); } - virtual ~MyExceptionListener(){} virtual void onTransportException( Transport* source, const decaf::lang::Exception& ex AMQCPP_UNUSED){ @@ -219,12 +207,10 @@ decaf::io::ByteArrayOutputStream os; decaf::io::DataInputStream input( &is ); decaf::io::DataOutputStream output( &os ); - MyCommandListener listener; + MyTransportListener listener; MyWireFormat wireFormat; - MyExceptionListener exListener; IOTransport transport( &wireFormat ); - transport.setCommandListener( &listener ); - transport.setTransportExceptionListener( &exListener ); + transport.setTransportListener( &listener ); transport.setInputStream( &input ); transport.setOutputStream( &output ); @@ -244,14 +230,12 @@ decaf::io::DataOutputStream output( &bos ); for( int i = 0; i < 50; ++i ) { - MyCommandListener listener; MyWireFormat wireFormat; - MyExceptionListener exListener; + MyTransportListener listener; IOTransport transport; - transport.setCommandListener( &listener ); transport.setWireFormat( &wireFormat ); - transport.setTransportExceptionListener( &exListener ); + transport.setTransportListener( &listener ); transport.setInputStream( &input ); transport.setOutputStream( &output ); @@ -277,14 +261,12 @@ decaf::io::DataInputStream input( &is ); decaf::io::DataOutputStream output( &os ); - MyCommandListener listener(10); MyWireFormat wireFormat; - MyExceptionListener exListener; + MyTransportListener listener(10); IOTransport transport; - transport.setCommandListener( &listener ); transport.setInputStream( &input ); transport.setOutputStream( &output ); - transport.setTransportExceptionListener( &exListener ); + transport.setTransportListener( &listener ); transport.setWireFormat( &wireFormat ); transport.start(); @@ -315,14 +297,12 @@ decaf::io::DataInputStream input( &is ); decaf::io::DataOutputStream output( &os ); - MyCommandListener listener; MyWireFormat wireFormat; - MyExceptionListener exListener; + MyTransportListener listener; IOTransport transport; - transport.setCommandListener( &listener ); transport.setInputStream( &input ); transport.setOutputStream( &output ); - transport.setTransportExceptionListener( &exListener ); + transport.setTransportListener( &listener ); transport.setWireFormat( &wireFormat ); transport.start(); @@ -359,15 +339,13 @@ decaf::io::DataInputStream input( &is ); decaf::io::DataOutputStream output( &os ); - MyCommandListener listener; MyWireFormat wireFormat; - MyExceptionListener exListener; + MyTransportListener listener; IOTransport transport; - transport.setCommandListener( &listener ); wireFormat.throwException = true; transport.setInputStream( &input ); transport.setOutputStream( &output ); - transport.setTransportExceptionListener( &exListener ); + transport.setTransportListener( &listener ); transport.setWireFormat( &wireFormat ); unsigned char buffer[1] = { '1' }; @@ -381,15 +359,15 @@ transport.start(); - synchronized(&exListener.mutex) + synchronized(&listener.mutex) { - if(exListener.transport != &transport) + if(listener.transport != &transport) { - exListener.mutex.wait(1000); + listener.mutex.wait(1000); } } - CPPUNIT_ASSERT( exListener.transport == &transport ); + CPPUNIT_ASSERT( listener.transport == &transport ); transport.close(); } Modified: activemq/activemq-cpp/trunk/src/test/activemq/transport/correlator/ResponseCorrelatorTest.cpp URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/test/activemq/transport/correlator/ResponseCorrelatorTest.cpp?rev=737745&r1=737744&r2=737745&view=diff ============================================================================== --- activemq/activemq-cpp/trunk/src/test/activemq/transport/correlator/ResponseCorrelatorTest.cpp (original) +++ activemq/activemq-cpp/trunk/src/test/activemq/transport/correlator/ResponseCorrelatorTest.cpp Mon Jan 26 16:48:29 2009 @@ -29,10 +29,8 @@ MyListener listener; MyTransport transport; ResponseCorrelator correlator( &transport, false ); - correlator.setCommandListener( &listener ); - correlator.setTransportExceptionListener( &listener ); + correlator.setTransportListener( &listener ); CPPUNIT_ASSERT( transport.listener == &correlator ); - CPPUNIT_ASSERT( transport.exListener == &correlator ); // Give the thread a little time to get up and running. synchronized(&transport.startedMutex) @@ -74,10 +72,8 @@ MyListener listener; MyTransport transport; ResponseCorrelator correlator( &transport, false ); - correlator.setCommandListener( &listener ); - correlator.setTransportExceptionListener( &listener ); + correlator.setTransportListener( &listener ); CPPUNIT_ASSERT( transport.listener == &correlator ); - CPPUNIT_ASSERT( transport.exListener == &correlator ); // Give the thread a little time to get up and running. synchronized(&transport.startedMutex) @@ -116,10 +112,8 @@ MyListener listener; MyBrokenTransport transport; ResponseCorrelator correlator( &transport, false ); - correlator.setCommandListener( &listener ); - correlator.setTransportExceptionListener( &listener ); + correlator.setTransportListener( &listener ); CPPUNIT_ASSERT( transport.listener == &correlator ); - CPPUNIT_ASSERT( transport.exListener == &correlator ); // Give the thread a little time to get up and running. synchronized(&transport.startedMutex) @@ -162,10 +156,8 @@ MyListener listener; MyTransport transport; ResponseCorrelator correlator( &transport, false ); - correlator.setCommandListener( &listener ); - correlator.setTransportExceptionListener( &listener ); + correlator.setTransportListener( &listener ); CPPUNIT_ASSERT( transport.listener == &correlator ); - CPPUNIT_ASSERT( transport.exListener == &correlator ); // Start the transport. correlator.start(); Modified: activemq/activemq-cpp/trunk/src/test/activemq/transport/correlator/ResponseCorrelatorTest.h URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/test/activemq/transport/correlator/ResponseCorrelatorTest.h?rev=737745&r1=737744&r2=737745&view=diff ============================================================================== --- activemq/activemq-cpp/trunk/src/test/activemq/transport/correlator/ResponseCorrelatorTest.h (original) +++ activemq/activemq-cpp/trunk/src/test/activemq/transport/correlator/ResponseCorrelatorTest.h Mon Jan 26 16:48:29 2009 @@ -21,11 +21,12 @@ #include #include +#include +#include #include #include #include #include -#include #include namespace activemq{ @@ -122,8 +123,7 @@ public Transport, public decaf::lang::Runnable{ public: - CommandListener* listener; - TransportExceptionListener* exListener; + TransportListener* listener; decaf::lang::Thread* thread; decaf::util::concurrent::Mutex mutex; decaf::util::concurrent::Mutex startedMutex; @@ -134,7 +134,6 @@ MyTransport(){ listener = NULL; - exListener = NULL; thread = NULL; done = false; } @@ -172,18 +171,12 @@ "stuff" ); } - virtual void setCommandListener( CommandListener* listener ){ - this->listener = listener; - } - virtual void setWireFormat( wireformat::WireFormat* wireFormat ) { } - virtual void setTransportExceptionListener( - TransportExceptionListener* listener ) - { - this->exListener = listener; + virtual void setTransportListener( TransportListener* listener ) { + this->listener = listener; } virtual void start() throw( cms::CMSException ){ @@ -259,14 +252,14 @@ } } }catch( exceptions::ActiveMQException& ex ){ - if( exListener ){ - exListener->onTransportException( this, ex ); + if( listener ){ + listener->onTransportException( this, ex ); } } catch( ... ){ - if( exListener ){ + if( listener ){ exceptions::ActiveMQException ex( __FILE__, __LINE__, "stuff" ); - exListener->onTransportException( this, ex ); + listener->onTransportException( this, ex ); } } } @@ -278,6 +271,35 @@ return NULL; } + + /** + * Is this Transport fault tolerant, meaning that it will reconnect to + * a broker on disconnect. + * + * @returns true if the Transport is fault tolerant. + */ + virtual bool isFaultTolerant() const { + return true; + } + + /** + * Is the Transport Connected to its Broker. + * + * @returns true if a connection has been made. + */ + virtual bool isConnected() const { + return false; + } + + /** + * Has the Transport been shutdown and no longer usable. + * + * @returns true if the Transport + */ + virtual bool isClosed() const { + return false; + } + }; class MyBrokenTransport : public MyTransport{ @@ -292,11 +314,7 @@ } }; - class MyListener - : - public CommandListener, - public TransportExceptionListener{ - + class MyListener : public DefaultTransportListener { public: int exCount; @@ -327,16 +345,6 @@ } } - /** - * The transport has suffered an interruption from which it hopes to recover - */ - virtual void transportInterrupted() {} - - /** - * The transport has resumed after an interruption - */ - virtual void transportResumed() {} - }; class RequestThread : public decaf::lang::Thread{