Author: tabish
Date: Mon Aug 10 18:22:05 2009
New Revision: 802880
URL: http://svn.apache.org/viewvc?rev=802880&view=rev
Log:
Implements: https://issues.apache.org/activemq/browse/AMQCPP-258
Modified:
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h
activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnectionSupport.h
activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ActiveMQConnectionFactoryTest.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ActiveMQConnectionFactoryTest.h
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp?rev=802880&r1=802879&r2=802880&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp
(original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp
Mon Aug 10 18:22:05 2009
@@ -505,6 +505,17 @@
} else {
//LOGDECAF_WARN( logger, "Received an unknown command" );
}
+
+ synchronized( &transportListeners ) {
+
+ Pointer< Iterator<TransportListener*> > iter( transportListeners.iterator()
);
+
+ while( iter->hasNext() ) {
+ try{
+ iter->next()->onCommand( command );
+ } catch(...) {}
+ }
+ }
}
AMQ_CATCH_RETHROW( ActiveMQException )
AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
@@ -523,6 +534,17 @@
// Inform the user of the error.
fire( exceptions::ActiveMQException( ex ) );
+
+ synchronized( &transportListeners ) {
+
+ Pointer< Iterator<TransportListener*> > iter( transportListeners.iterator()
);
+
+ while( iter->hasNext() ) {
+ try{
+ iter->next()->onException( ex );
+ } catch(...) {}
+ }
+ }
}
AMQ_CATCH_RETHROW( ActiveMQException )
AMQ_CATCHALL_THROW( ActiveMQException )
@@ -538,6 +560,32 @@
iter->next()->clearMessagesInProgress();
}
}
+
+ synchronized( &transportListeners ) {
+
+ Pointer< Iterator<TransportListener*> > iter( transportListeners.iterator()
);
+
+ while( iter->hasNext() ) {
+ try{
+ iter->next()->transportInterrupted();
+ } catch(...) {}
+ }
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConnection::transportResumed() {
+
+ synchronized( &transportListeners ) {
+
+ Pointer< Iterator<TransportListener*> > iter( transportListeners.iterator()
);
+
+ while( iter->hasNext() ) {
+ try{
+ iter->next()->transportResumed();
+ } catch(...) {}
+ }
+ }
}
////////////////////////////////////////////////////////////////////////////////
@@ -625,3 +673,29 @@
return *( this->connectionInfo->getConnectionId() );
}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConnection::addTransportListener( TransportListener* transportListener ) {
+
+ if( transportListener == NULL ) {
+ return;
+ }
+
+ // Add this listener from the set of active TransportListeners
+ synchronized( &transportListeners ) {
+ transportListeners.add( transportListener );
+ }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConnection::removeTransportListener( TransportListener* transportListener )
{
+
+ if( transportListener == NULL ) {
+ return;
+ }
+
+ // Remove this listener from the set of active TransportListeners
+ synchronized( &transportListeners ) {
+ transportListeners.remove( transportListener );
+ }
+}
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h?rev=802880&r1=802879&r2=802880&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h Mon
Aug 10 18:22:05 2009
@@ -31,6 +31,7 @@
#include <activemq/commands/LocalTransactionId.h>
#include <activemq/commands/WireFormatInfo.h>
#include <activemq/exceptions/ActiveMQException.h>
+#include <activemq/transport/TransportListener.h>
#include <decaf/util/Properties.h>
#include <decaf/util/StlMap.h>
#include <decaf/util/StlSet.h>
@@ -118,6 +119,11 @@
decaf::util::StlSet<ActiveMQSession*> activeSessions;
/**
+ * Maintain the set of all active sessions.
+ */
+ decaf::util::StlSet<transport::TransportListener*> transportListeners;
+
+ /**
* the registered exception listener
*/
cms::ExceptionListener* exceptionListener;
@@ -334,6 +340,29 @@
public: // TransportListener
/**
+ * Adds a transport listener so that a client can be notified of events in
+ * the underlying transport, client's are always notified after the event has
+ * been processed by the Connection class. Client's should ensure that the
+ * registered listener does not block or take a long amount of time to execute
+ * in order to not degrade performance of this Connection.
+ *
+ * @param transportListener
+ * The TransportListener instance to add to this Connection's set of listeners
+ * to notify of Transport events.
+ */
+ void addTransportListener( transport::TransportListener* transportListener );
+
+ /**
+ * Removes a registered TransportListener from the Connection's set of Transport
+ * listeners, this listener will no longer receive any Transport related events.
The
+ * caller is responsible for freeing the listener in all cases.
+ *
+ * @param transportListener
+ * The pointer to the TransportListener to remove from the set of listeners.
+ */
+ void removeTransportListener( transport::TransportListener* transportListener );
+
+ /**
* Event handler for the receipt of a non-response command from the
* transport.
* @param command the received command object.
@@ -351,6 +380,11 @@
*/
virtual void transportInterrupted();
+ /**
+ * The transport has resumed after an interruption
+ */
+ virtual void transportResumed();
+
public:
/**
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnectionSupport.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnectionSupport.h?rev=802880&r1=802879&r2=802880&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnectionSupport.h
(original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnectionSupport.h
Mon Aug 10 18:22:05 2009
@@ -317,11 +317,6 @@
return this->localTransactionIds.getNextSequenceId();
}
- /**
- * The transport has resumed after an interruption
- */
- virtual void transportResumed() {}
-
};
}}
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ActiveMQConnectionFactoryTest.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ActiveMQConnectionFactoryTest.cpp?rev=802880&r1=802879&r2=802880&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ActiveMQConnectionFactoryTest.cpp
(original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ActiveMQConnectionFactoryTest.cpp
Mon Aug 10 18:22:05 2009
@@ -25,80 +25,65 @@
#include <activemq/core/ActiveMQConnection.h>
#include <activemq/core/ActiveMQSession.h>
#include <activemq/core/ActiveMQProducer.h>
+#include <activemq/commands/ActiveMQTextMessage.h>
+#include <activemq/transport/TransportListener.h>
#include <memory>
using namespace std;
+using namespace decaf::lang;
using namespace activemq;
using namespace activemq::core;
+using namespace activemq::commands;
+using namespace activemq::transport;
-//////////////////////////////////////////////////////////////////////////////////
-//void ActiveMQConnectionFactoryTest::test1WithStomp()
-//{
-// try
-// {
-// std::string URI =
-// "mock://127.0.0.1:23232?wireFormat=stomp";
-//
-// ActiveMQConnectionFactory connectionFactory( URI );
-//
-// cms::Connection* connection =
-// connectionFactory.createConnection();
-//
-// CPPUNIT_ASSERT( connection != NULL );
-//
-// delete connection;
-//
-// return;
-// }
-// AMQ_CATCH_NOTHROW( exceptions::ActiveMQException )
-// AMQ_CATCHALL_NOTHROW( )
-//
-// CPPUNIT_ASSERT( false );
-//}
-//
-//////////////////////////////////////////////////////////////////////////////////
-//void ActiveMQConnectionFactoryTest::test2WithStomp()
-//{
-// try
-// {
-// std::string URI = std::string() +
-// "mock://127.0.0.1:23232?wireFormat=stomp&"
-// "username=" + username + "&password=" + password +
-// "&client-id=" + clientId;
-//
-// ActiveMQConnectionFactory connectionFactory( URI );
-//
-// cms::Connection* connection =
-// connectionFactory.createConnection();
-// CPPUNIT_ASSERT( connection != NULL );
-//
-// ActiveMQConnection* amqConnection =
-// dynamic_cast< ActiveMQConnection* >( connection );
-// CPPUNIT_ASSERT( amqConnection != NULL );
-//
-// connector::Connector* connector =
-// dynamic_cast< connector::Connector* >(
-// amqConnection->getConnectionData()->getConnector() );
-// CPPUNIT_ASSERT( connector != NULL );
-//
-// CPPUNIT_ASSERT( username == connector->getUsername() );
-// CPPUNIT_ASSERT( password == connector->getPassword() );
-// CPPUNIT_ASSERT( clientId == connector->getClientId() );
-//
-// // Free the allocated connection object.
-// delete connection;
-//
-// return;
-// }
-// AMQ_CATCH_NOTHROW( exceptions::ActiveMQException )
-// AMQ_CATCHALL_NOTHROW( )
-//
-// CPPUNIT_ASSERT( false );
-//}
+////////////////////////////////////////////////////////////////////////////////
+namespace activemq{
+namespace core{
+
+ class MyTransportListener : public TransportListener {
+ private:
+
+ bool interrupted;
+ bool resumed;
+
+ public:
+
+ MyTransportListener() {
+ this->interrupted = false;
+ this->resumed = true;
+ }
+
+ bool isInterrupted() const {
+ return this->interrupted;
+ }
+
+ bool isResumed() const {
+ return this->resumed;
+ }
+
+ virtual void onCommand( const Pointer<Command>& command ) {
+
+ }
+
+ virtual void onException( const decaf::lang::Exception& ex ) {
+
+ }
+
+ virtual void transportInterrupted() {
+ this->interrupted = true;
+ }
+
+ virtual void transportResumed() {
+ this->resumed = true;
+ }
+
+ };
+
+}}
////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConnectionFactoryTest::test1WithOpenWire()
-{
+void ActiveMQConnectionFactoryTest::test1WithOpenWire() {
+
try
{
std::string URI =
@@ -202,3 +187,40 @@
CPPUNIT_ASSERT( false );
}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConnectionFactoryTest::testTransportListener() {
+
+ std::string URI = "failover://(mock://localhost:61616?failOnSendMessage=true,"
+ "mock://localhost:61618)?randomize=false";
+
+ MyTransportListener listener;
+
+ ActiveMQConnectionFactory connectionFactory( URI );
+
+ std::auto_ptr<cms::Connection> connection(
+ connectionFactory.createConnection() );
+ CPPUNIT_ASSERT( connection.get() != NULL );
+
+ ActiveMQConnection* amqConnection =
+ dynamic_cast< ActiveMQConnection* >( connection.get() );
+
+ amqConnection->addTransportListener( & listener );
+
+ std::auto_ptr<ActiveMQSession> session( dynamic_cast<ActiveMQSession*>(
+ amqConnection->createSession() ) );
+
+ std::auto_ptr<cms::Destination> destination( session->createTopic( "TEST" )
);
+
+ std::auto_ptr<ActiveMQProducer> producer( dynamic_cast<ActiveMQProducer*>(
+ session->createProducer( destination.get() ) ) );
+
+ std::auto_ptr<cms::TextMessage> message( session->createTextMessage() );
+ producer->send( message.get() );
+
+ Thread::sleep( 2000 );
+
+ CPPUNIT_ASSERT( listener.isInterrupted() );
+ CPPUNIT_ASSERT( listener.isResumed() );
+
+}
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ActiveMQConnectionFactoryTest.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ActiveMQConnectionFactoryTest.h?rev=802880&r1=802879&r2=802880&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ActiveMQConnectionFactoryTest.h
(original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ActiveMQConnectionFactoryTest.h
Mon Aug 10 18:22:05 2009
@@ -27,12 +27,11 @@
class ActiveMQConnectionFactoryTest : public CppUnit::TestFixture
{
CPPUNIT_TEST_SUITE( ActiveMQConnectionFactoryTest );
-// CPPUNIT_TEST( test1WithStomp );
-// CPPUNIT_TEST( test2WithStomp );
CPPUNIT_TEST( test1WithOpenWire );
CPPUNIT_TEST( test2WithOpenWire );
CPPUNIT_TEST( testExceptionOnCreate );
CPPUNIT_TEST( testCreateWithURIOptions );
+ CPPUNIT_TEST( testTransportListener );
CPPUNIT_TEST_SUITE_END();
public:
@@ -48,12 +47,11 @@
}
virtual ~ActiveMQConnectionFactoryTest() {}
-// void test1WithStomp();
-// void test2WithStomp();
void test1WithOpenWire();
void test2WithOpenWire();
void testExceptionOnCreate();
void testCreateWithURIOptions();
+ void testTransportListener();
};
|