activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r802880 - in /activemq/activemq-cpp/trunk/activemq-cpp/src: main/activemq/core/ test/activemq/core/
Date Mon, 10 Aug 2009 18:22:06 GMT
Author: tabish
Date: Mon Aug 10 18:22:05 2009
New Revision: 802880

URL: http://svn.apache.org/viewvc?rev=802880&view=rev
Log:
Implements: https://issues.apache.org/activemq/browse/AMQCPP-258

Modified:
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnectionSupport.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ActiveMQConnectionFactoryTest.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ActiveMQConnectionFactoryTest.h

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp?rev=802880&r1=802879&r2=802880&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp
(original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp
Mon Aug 10 18:22:05 2009
@@ -505,6 +505,17 @@
         } else {
             //LOGDECAF_WARN( logger, "Received an unknown command" );
         }
+
+        synchronized( &transportListeners ) {
+
+            Pointer< Iterator<TransportListener*> > iter( transportListeners.iterator()
);
+
+            while( iter->hasNext() ) {
+                try{
+                    iter->next()->onCommand( command );
+                } catch(...) {}
+            }
+        }
     }
     AMQ_CATCH_RETHROW( ActiveMQException )
     AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
@@ -523,6 +534,17 @@
 
         // Inform the user of the error.
         fire( exceptions::ActiveMQException( ex ) );
+
+        synchronized( &transportListeners ) {
+
+            Pointer< Iterator<TransportListener*> > iter( transportListeners.iterator()
);
+
+            while( iter->hasNext() ) {
+                try{
+                    iter->next()->onException( ex );
+                } catch(...) {}
+            }
+        }
     }
     AMQ_CATCH_RETHROW( ActiveMQException )
     AMQ_CATCHALL_THROW( ActiveMQException )
@@ -538,6 +560,32 @@
             iter->next()->clearMessagesInProgress();
         }
     }
+
+    synchronized( &transportListeners ) {
+
+        Pointer< Iterator<TransportListener*> > iter( transportListeners.iterator()
);
+
+        while( iter->hasNext() ) {
+            try{
+                iter->next()->transportInterrupted();
+            } catch(...) {}
+        }
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConnection::transportResumed() {
+
+    synchronized( &transportListeners ) {
+
+        Pointer< Iterator<TransportListener*> > iter( transportListeners.iterator()
);
+
+        while( iter->hasNext() ) {
+            try{
+                iter->next()->transportResumed();
+            } catch(...) {}
+        }
+    }
 }
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -625,3 +673,29 @@
 
     return *( this->connectionInfo->getConnectionId() );
 }
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConnection::addTransportListener( TransportListener* transportListener ) {
+
+    if( transportListener == NULL ) {
+        return;
+    }
+
+    // Add this listener from the set of active TransportListeners
+    synchronized( &transportListeners ) {
+        transportListeners.add( transportListener );
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConnection::removeTransportListener( TransportListener* transportListener )
{
+
+    if( transportListener == NULL ) {
+        return;
+    }
+
+    // Remove this listener from the set of active TransportListeners
+    synchronized( &transportListeners ) {
+        transportListeners.remove( transportListener );
+    }
+}

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h?rev=802880&r1=802879&r2=802880&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h Mon
Aug 10 18:22:05 2009
@@ -31,6 +31,7 @@
 #include <activemq/commands/LocalTransactionId.h>
 #include <activemq/commands/WireFormatInfo.h>
 #include <activemq/exceptions/ActiveMQException.h>
+#include <activemq/transport/TransportListener.h>
 #include <decaf/util/Properties.h>
 #include <decaf/util/StlMap.h>
 #include <decaf/util/StlSet.h>
@@ -118,6 +119,11 @@
         decaf::util::StlSet<ActiveMQSession*> activeSessions;
 
         /**
+         * Maintain the set of all active sessions.
+         */
+        decaf::util::StlSet<transport::TransportListener*> transportListeners;
+
+        /**
          * the registered exception listener
          */
         cms::ExceptionListener* exceptionListener;
@@ -334,6 +340,29 @@
     public: // TransportListener
 
         /**
+         * Adds a transport listener so that a client can be notified of events in
+         * the underlying transport, client's are always notified after the event has
+         * been processed by the Connection class.  Client's should ensure that the
+         * registered listener does not block or take a long amount of time to execute
+         * in order to not degrade performance of this Connection.
+         *
+         * @param transportListener
+         *      The TransportListener instance to add to this Connection's set of listeners
+         *      to notify of Transport events.
+         */
+        void addTransportListener( transport::TransportListener* transportListener );
+
+        /**
+         * Removes a registered TransportListener from the Connection's set of Transport
+         * listeners, this listener will no longer receive any Transport related events.
 The
+         * caller is responsible for freeing the listener in all cases.
+         *
+         * @param transportListener
+         *      The pointer to the TransportListener to remove from the set of listeners.
+         */
+        void removeTransportListener( transport::TransportListener* transportListener );
+
+        /**
          * Event handler for the receipt of a non-response command from the
          * transport.
          * @param command the received command object.
@@ -351,6 +380,11 @@
          */
         virtual void transportInterrupted();
 
+        /**
+         * The transport has resumed after an interruption
+         */
+        virtual void transportResumed();
+
     public:
 
         /**

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnectionSupport.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnectionSupport.h?rev=802880&r1=802879&r2=802880&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnectionSupport.h
(original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnectionSupport.h
Mon Aug 10 18:22:05 2009
@@ -317,11 +317,6 @@
             return this->localTransactionIds.getNextSequenceId();
         }
 
-        /**
-         * The transport has resumed after an interruption
-         */
-        virtual void transportResumed() {}
-
     };
 
 }}

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ActiveMQConnectionFactoryTest.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ActiveMQConnectionFactoryTest.cpp?rev=802880&r1=802879&r2=802880&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ActiveMQConnectionFactoryTest.cpp
(original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ActiveMQConnectionFactoryTest.cpp
Mon Aug 10 18:22:05 2009
@@ -25,80 +25,65 @@
 #include <activemq/core/ActiveMQConnection.h>
 #include <activemq/core/ActiveMQSession.h>
 #include <activemq/core/ActiveMQProducer.h>
+#include <activemq/commands/ActiveMQTextMessage.h>
+#include <activemq/transport/TransportListener.h>
 #include <memory>
 
 using namespace std;
+using namespace decaf::lang;
 using namespace activemq;
 using namespace activemq::core;
+using namespace activemq::commands;
+using namespace activemq::transport;
 
-//////////////////////////////////////////////////////////////////////////////////
-//void ActiveMQConnectionFactoryTest::test1WithStomp()
-//{
-//    try
-//    {
-//        std::string URI =
-//            "mock://127.0.0.1:23232?wireFormat=stomp";
-//
-//        ActiveMQConnectionFactory connectionFactory( URI );
-//
-//        cms::Connection* connection =
-//            connectionFactory.createConnection();
-//
-//        CPPUNIT_ASSERT( connection != NULL );
-//
-//        delete connection;
-//
-//        return;
-//    }
-//    AMQ_CATCH_NOTHROW( exceptions::ActiveMQException )
-//    AMQ_CATCHALL_NOTHROW( )
-//
-//    CPPUNIT_ASSERT( false );
-//}
-//
-//////////////////////////////////////////////////////////////////////////////////
-//void ActiveMQConnectionFactoryTest::test2WithStomp()
-//{
-//    try
-//    {
-//        std::string URI = std::string() +
-//            "mock://127.0.0.1:23232?wireFormat=stomp&"
-//            "username=" + username + "&password=" + password +
-//            "&client-id=" + clientId;
-//
-//        ActiveMQConnectionFactory connectionFactory( URI );
-//
-//        cms::Connection* connection =
-//            connectionFactory.createConnection();
-//        CPPUNIT_ASSERT( connection != NULL );
-//
-//        ActiveMQConnection* amqConnection =
-//            dynamic_cast< ActiveMQConnection* >( connection );
-//        CPPUNIT_ASSERT( amqConnection != NULL );
-//
-//        connector::Connector* connector =
-//            dynamic_cast< connector::Connector* >(
-//            amqConnection->getConnectionData()->getConnector() );
-//        CPPUNIT_ASSERT( connector != NULL );
-//
-//        CPPUNIT_ASSERT( username == connector->getUsername() );
-//        CPPUNIT_ASSERT( password == connector->getPassword() );
-//        CPPUNIT_ASSERT( clientId == connector->getClientId() );
-//
-//        // Free the allocated connection object.
-//        delete connection;
-//
-//        return;
-//    }
-//    AMQ_CATCH_NOTHROW( exceptions::ActiveMQException )
-//    AMQ_CATCHALL_NOTHROW( )
-//
-//    CPPUNIT_ASSERT( false );
-//}
+////////////////////////////////////////////////////////////////////////////////
+namespace activemq{
+namespace core{
+
+    class MyTransportListener : public TransportListener {
+    private:
+
+        bool interrupted;
+        bool resumed;
+
+    public:
+
+        MyTransportListener() {
+            this->interrupted = false;
+            this->resumed = true;
+        }
+
+        bool isInterrupted() const {
+            return this->interrupted;
+        }
+
+        bool isResumed() const {
+            return this->resumed;
+        }
+
+        virtual void onCommand( const Pointer<Command>& command ) {
+
+        }
+
+        virtual void onException( const decaf::lang::Exception& ex ) {
+
+        }
+
+        virtual void transportInterrupted() {
+            this->interrupted = true;
+        }
+
+        virtual void transportResumed() {
+            this->resumed = true;
+        }
+
+    };
+
+}}
 
 ////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConnectionFactoryTest::test1WithOpenWire()
-{
+void ActiveMQConnectionFactoryTest::test1WithOpenWire() {
+
     try
     {
         std::string URI =
@@ -202,3 +187,40 @@
 
     CPPUNIT_ASSERT( false );
 }
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConnectionFactoryTest::testTransportListener() {
+
+    std::string URI = "failover://(mock://localhost:61616?failOnSendMessage=true,"
+                      "mock://localhost:61618)?randomize=false";
+
+    MyTransportListener listener;
+
+    ActiveMQConnectionFactory connectionFactory( URI );
+
+    std::auto_ptr<cms::Connection> connection(
+        connectionFactory.createConnection() );
+    CPPUNIT_ASSERT( connection.get() != NULL );
+
+    ActiveMQConnection* amqConnection =
+        dynamic_cast< ActiveMQConnection* >( connection.get() );
+
+    amqConnection->addTransportListener( & listener );
+
+    std::auto_ptr<ActiveMQSession> session( dynamic_cast<ActiveMQSession*>(
+        amqConnection->createSession() ) );
+
+    std::auto_ptr<cms::Destination> destination( session->createTopic( "TEST" )
);
+
+    std::auto_ptr<ActiveMQProducer> producer( dynamic_cast<ActiveMQProducer*>(
+        session->createProducer( destination.get() ) ) );
+
+    std::auto_ptr<cms::TextMessage> message( session->createTextMessage() );
+    producer->send( message.get() );
+
+    Thread::sleep( 2000 );
+
+    CPPUNIT_ASSERT( listener.isInterrupted() );
+    CPPUNIT_ASSERT( listener.isResumed() );
+
+}

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ActiveMQConnectionFactoryTest.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ActiveMQConnectionFactoryTest.h?rev=802880&r1=802879&r2=802880&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ActiveMQConnectionFactoryTest.h
(original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ActiveMQConnectionFactoryTest.h
Mon Aug 10 18:22:05 2009
@@ -27,12 +27,11 @@
     class ActiveMQConnectionFactoryTest : public CppUnit::TestFixture
     {
         CPPUNIT_TEST_SUITE( ActiveMQConnectionFactoryTest );
-//        CPPUNIT_TEST( test1WithStomp );
-//        CPPUNIT_TEST( test2WithStomp );
         CPPUNIT_TEST( test1WithOpenWire );
         CPPUNIT_TEST( test2WithOpenWire );
         CPPUNIT_TEST( testExceptionOnCreate );
         CPPUNIT_TEST( testCreateWithURIOptions );
+        CPPUNIT_TEST( testTransportListener );
         CPPUNIT_TEST_SUITE_END();
 
     public:
@@ -48,12 +47,11 @@
         }
         virtual ~ActiveMQConnectionFactoryTest() {}
 
-//        void test1WithStomp();
-//        void test2WithStomp();
         void test1WithOpenWire();
         void test2WithOpenWire();
         void testExceptionOnCreate();
         void testCreateWithURIOptions();
+        void testTransportListener();
 
     };
 



Mime
View raw message