activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r518180 - in /activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/openwire: OpenwireTempDestinationTest.cpp OpenwireTempDestinationTest.h
Date Wed, 14 Mar 2007 15:30:40 GMT
Author: tabish
Date: Wed Mar 14 08:30:33 2007
New Revision: 518180

URL: http://svn.apache.org/viewvc?view=rev&rev=518180
Log:
http://issues.apache.org/activemq/browse/AMQCPP-84

Modified:
    activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/openwire/OpenwireTempDestinationTest.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/openwire/OpenwireTempDestinationTest.h

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/openwire/OpenwireTempDestinationTest.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/openwire/OpenwireTempDestinationTest.cpp?view=diff&rev=518180&r1=518179&r2=518180
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/openwire/OpenwireTempDestinationTest.cpp
(original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/openwire/OpenwireTempDestinationTest.cpp
Wed Mar 14 08:30:33 2007
@@ -19,7 +19,7 @@
 
 #include <integration/IntegrationCommon.h>
 
-CPPUNIT_TEST_SUITE_REGISTRATION( integration::connector::openwire::OpenwireTempDestinationTest
);
+//CPPUNIT_TEST_SUITE_REGISTRATION( integration::connector::openwire::OpenwireTempDestinationTest
);
 
 #include <activemq/concurrent/Thread.h>
 #include <activemq/concurrent/Mutex.h>
@@ -112,9 +112,42 @@
                                                    session,
                                                    responseTopic );
 
+        // Launch the Consumers in new Threads.
+        Thread requestorThread( requestConsumer );
+        Thread responderThread( responseConsumer );
+        requestorThread.start();
+        responderThread.start();
+        Thread::sleep( 100 );
+
         cms::MessageProducer* producer =
             session->createProducer( requestTopic );
 
+        // Send some bytes messages.
+        testSupport.produceTextMessages(
+            *producer, IntegrationCommon::defaultMsgCount, responseTopic );
+
+        // Let the request consumer get all its messages
+        waitForMessages( *requestConsumer,
+                         IntegrationCommon::defaultMsgCount );
+
+        // Check that we got them all.
+        CPPUNIT_ASSERT( requestConsumer->getNumReceived() ==
+                        IntegrationCommon::defaultMsgCount );
+
+        // Let the response consumer get all its messages
+        waitForMessages( *responseConsumer,
+                         IntegrationCommon::defaultMsgCount );
+
+        // Check that we got them all.
+        CPPUNIT_ASSERT( responseConsumer->getNumReceived() ==
+                        IntegrationCommon::defaultMsgCount );
+
+        // Shutdown the Consumer Threads.
+        requestConsumer->stop();
+        responseConsumer->stop();
+        requestorThread.join();
+        responderThread.join();
+
         delete producer;
         delete requestConsumer;
         delete responseConsumer;
@@ -127,6 +160,32 @@
 }
 
 ///////////////////////////////////////////////////////////////////////////////
+void OpenwireTempDestinationTest::waitForMessages(
+    Consumer& consumer,
+    unsigned int count )
+{
+    try
+    {
+        synchronized( &( consumer.getOnMsgMutex() ) )
+        {
+            unsigned int stopAtZero = count + 10;
+
+            while( consumer.getNumReceived() < count )
+            {
+                consumer.getOnMsgMutex().wait( 500 );
+
+                if( --stopAtZero == 0 )
+                {
+                    break;
+                }
+            }
+        }
+    }
+    AMQ_CATCH_RETHROW( ActiveMQException )
+    AMQ_CATCHALL_THROW( ActiveMQException )
+}
+
+///////////////////////////////////////////////////////////////////////////////
 OpenwireTempDestinationTest::Consumer::Consumer(
     cms::Connection* connection,
     cms::Session* session,
@@ -186,7 +245,25 @@
     {
         const cms::Destination* replyTo = message->getCMSReplyTo();
 
+        if( replyTo != NULL ) {
+
+            cms::MessageProducer* producer = session->createProducer( replyTo );
+            cms::Message* response = session->createMessage();
+
+            // Send it back to the replyTo Destination
+            producer->send( response );
+
+            delete response;
+            delete producer;
+        }
+
         numReceived++;
+
+        // Signal anyone waiting on us getting new messages.
+        synchronized( &onMsgMutex ){
+            onMsgMutex.notifyAll();
+        }
+
     } catch( CMSException& e ) {
         e.printStackTrace();
     }

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/openwire/OpenwireTempDestinationTest.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/openwire/OpenwireTempDestinationTest.h?view=diff&rev=518180&r1=518179&r2=518180
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/openwire/OpenwireTempDestinationTest.h
(original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/openwire/OpenwireTempDestinationTest.h
Wed Mar 14 08:30:33 2007
@@ -57,8 +57,9 @@
             cms::Session* session;
             cms::Destination* destination;
             cms::MessageConsumer* consumer;
-            int numReceived;
+            unsigned int numReceived;
             activemq::concurrent::Mutex mutex;
+            activemq::concurrent::Mutex onMsgMutex;
 
         public:
 
@@ -68,7 +69,11 @@
 
             virtual ~Consumer();
 
-            virtual int getNumReceived() const {
+            virtual activemq::concurrent::Mutex& getOnMsgMutex() {
+                return this->onMsgMutex;
+            }
+
+            virtual unsigned int getNumReceived() const {
                 return this->numReceived;
             }
 
@@ -76,6 +81,10 @@
             virtual void run();
             virtual void onMessage( const cms::Message* message );
         };
+
+        // Internal Wait method
+        void waitForMessages( Consumer& consumer,
+                              unsigned int count );
 
     };
 



Mime
View raw message