activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nmitt...@apache.org
Subject svn commit: r516785 [4/4] - in /activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration: ./ .deps/ integration/ integration/common/ integration/connector/ integration/connector/openwire/ integration/connector/stomp/ integration/durable/ integrati...
Date Sat, 10 Mar 2007 21:21:24 GMT
Added: activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/TestSupport.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/TestSupport.cpp?view=auto&rev=516785
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/TestSupport.cpp (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/TestSupport.cpp Sat Mar 10 13:21:22 2007
@@ -0,0 +1,276 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "TestSupport.h"
+
+#include <cppunit/extensions/HelperMacros.h>
+
+#include <integration/IntegrationCommon.h>
+
+#include <activemq/core/ActiveMQConnectionFactory.h>
+#include <activemq/exceptions/ActiveMQException.h>
+#include <activemq/concurrent/Thread.h>
+#include <activemq/util/Guid.h>
+
+#include <cms/Connection.h>
+#include <cms/Session.h>
+
+#include <sstream>
+
+using namespace std;
+using namespace cms;
+using namespace activemq;
+using namespace activemq::core;
+using namespace activemq::util;
+using namespace activemq::exceptions;
+using namespace activemq::concurrent;
+
+using namespace integration;
+
+TestSupport::TestSupport( const string& brokerUrl, cms::Session::AcknowledgeMode ackMode  )
+ : connectionFactory( NULL ),
+   connection( NULL )
+{
+    this->ackMode = ackMode;
+    this->brokerUrl = brokerUrl;
+}
+
+TestSupport::~TestSupport()
+{
+    close();
+}
+
+void TestSupport::close() {
+    try
+    {
+        if( session != null ) {
+            session->close();
+            delete session;
+        }
+        
+        if( connection != null ) {
+            connection->close();
+            delete connection;
+        }
+
+        if( connectionFactory != null ) {
+            delete connectionFactory;
+        }
+    }
+    AMQ_CATCH_NOTHROW( ActiveMQException )
+    AMQ_CATCHALL_NOTHROW( )
+    
+    session = null;
+    connection = null;
+    connectionFactory = null;
+}
+
+void TestSupport::initialize(){
+    try
+    {
+        numReceived = 0;
+    
+        // Now create the connection
+        connection = createDetachedConnection( 
+            "", "", Guid().createGUIDString() );
+    
+        // Set ourself as a recipient of Exceptions        
+        connection->setExceptionListener( this );
+        connection->start();
+        
+        // Create a Session
+        session = connection->createSession( ackMode );
+    }
+    AMQ_CATCH_RETHROW( ActiveMQException )
+    AMQ_CATCHALL_THROW( ActiveMQException )
+}
+
+cms::Connection* TestSupport::createDetachedConnection(
+    const std::string& username,
+    const std::string& password,
+    const std::string& clientId ) {
+
+    try
+    {
+    
+        if( connectionFactory == NULL ) {
+            // Create a Factory
+            connectionFactory = new ActiveMQConnectionFactory( brokerUrl );
+        }
+
+        // Now create the connection
+        cms::Connection* connection = connectionFactory->createConnection(
+            username, password, clientId );
+
+        return connection;
+    }
+    AMQ_CATCH_RETHROW( ActiveMQException )
+    AMQ_CATCHALL_THROW( ActiveMQException )
+}
+
+void TestSupport::doSleep(void) 
+{
+    Thread::sleep( IntegrationCommon::defaultDelay );
+}
+
+unsigned int TestSupport::produceTextMessages( 
+    cms::MessageProducer& producer,
+    unsigned int count )
+{
+    try
+    {
+        // Send some text messages.
+        ostringstream stream;
+        string text = "this is a test text message: id = ";
+        
+        cms::TextMessage* textMsg = 
+            session->createTextMessage();
+
+        unsigned int realCount = 0;
+                     
+        for( unsigned int ix=0; ix<count; ++ix ){
+            stream << text << ix << ends;
+            textMsg->setText( stream.str().c_str() );        
+            stream.str("");  
+            producer.send( textMsg );
+            doSleep();
+            ++realCount;
+        }
+
+        delete textMsg;
+
+        return realCount;
+    }
+    AMQ_CATCH_RETHROW( ActiveMQException )
+    AMQ_CATCHALL_THROW( ActiveMQException )    
+}
+
+unsigned int TestSupport::produceBytesMessages( 
+    cms::MessageProducer& producer,
+    unsigned int count )
+{
+    try
+    {
+        unsigned char buf[10];
+        memset( buf, 0, 10 );
+        buf[0] = 0;
+        buf[1] = 1;
+        buf[2] = 2;
+        buf[3] = 3;
+        buf[4] = 0;
+        buf[5] = 4;
+        buf[6] = 5;
+        buf[7] = 6;
+
+        cms::BytesMessage* bytesMsg = 
+            session->createBytesMessage();
+        bytesMsg->setBodyBytes( buf, 10 );
+
+        unsigned int realCount = 0;
+        for( unsigned int ix=0; ix<count; ++ix ){                
+            producer.send( bytesMsg ); 
+            doSleep();
+            ++realCount;
+        }
+
+        delete bytesMsg;
+
+        return realCount;
+    }
+    AMQ_CATCH_RETHROW( ActiveMQException )
+    AMQ_CATCHALL_THROW( ActiveMQException )    
+}
+
+void TestSupport::waitForMessages( unsigned int count )
+{
+    try
+    {
+        synchronized( &mutex )
+        {
+            int stopAtZero = count + 5;
+            
+            while( numReceived < count )
+            {
+                mutex.wait( 500 );
+                
+                if( --stopAtZero == 0 )
+                {
+                    break;
+                }
+            }
+        }
+    }
+    AMQ_CATCH_RETHROW( ActiveMQException )
+    AMQ_CATCHALL_THROW( ActiveMQException )    
+}
+
+void TestSupport::onException( const cms::CMSException& error )
+{
+    bool AbstractTester = false;
+    CPPUNIT_ASSERT( AbstractTester );
+}
+
+void TestSupport::onMessage( const cms::Message* message )
+{
+    // Got a text message.
+    const cms::TextMessage* txtMsg = 
+        dynamic_cast<const cms::TextMessage*>(message);
+        
+    if( txtMsg != NULL )
+    {
+        std::string text = txtMsg->getText();
+
+        if( IntegrationCommon::debug ) {
+            printf("received text msg: %s\n", txtMsg->getText().c_str() );
+        }
+
+        numReceived++;
+
+        // Signal that we got one
+        synchronized( &mutex )
+        {
+            mutex.notifyAll();
+        }
+
+        return;
+    }
+    
+    // Got a bytes msg.
+    const cms::BytesMessage* bytesMsg = 
+        dynamic_cast<const cms::BytesMessage*>(message);
+
+    if( bytesMsg != NULL )
+    {
+        const unsigned char* bytes = bytesMsg->getBodyBytes();
+        
+        string transcode( (const char*)bytes, bytesMsg->getBodyLength() );
+
+        if( IntegrationCommon::debug ) {
+            printf("received bytes msg: %s\n", transcode.c_str() );
+        }
+
+        numReceived++;
+        
+        // Signal that we got one
+        synchronized( &mutex )
+        {
+            mutex.notifyAll();
+        }
+
+        return;
+    }
+}

Added: activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/TestSupport.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/TestSupport.h?view=auto&rev=516785
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/TestSupport.h (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/TestSupport.h Sat Mar 10 13:21:22 2007
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _INTEGRATION_TESTSUPPORT_H_
+#define _INTEGRATION_TESTSUPPORT_H_
+
+#include <activemq/concurrent/Mutex.h>
+
+#include <cms/ConnectionFactory.h>
+#include <cms/Connection.h>
+#include <cms/Session.h>
+#include <cms/MessageProducer.h>
+#include <cms/ExceptionListener.h>
+#include <cms/MessageListener.h>
+
+namespace integration{
+
+    class TestSupport : public cms::ExceptionListener, public cms::MessageListener
+    {
+    public:
+    
+    	TestSupport( const std::string& brokerUrl,
+                     cms::Session::AcknowledgeMode ackMode = cms::Session::AUTO_ACKNOWLEDGE );
+    	virtual ~TestSupport();
+    
+        virtual void initialize();
+        virtual void close();
+        
+        virtual activemq::concurrent::Mutex& getMutex() {
+            return mutex;
+        }
+        
+        virtual std::string getBrokerUrl() const {
+            return brokerUrl;
+        }
+        
+        virtual cms::Connection* getConnection() {
+            return connection;
+        }
+        
+        virtual cms::Session* getSession() {
+            return session;
+        }
+        
+        virtual unsigned int getNumReceived() const {
+            return numReceived;
+        }
+        
+        virtual void setNumReceived( unsigned int numReceived ) {
+            this->numReceived = numReceived;
+        }
+        
+        virtual void doSleep();
+
+        virtual unsigned int produceTextMessages( 
+            cms::MessageProducer& producer,
+            unsigned int count );
+        virtual unsigned int produceBytesMessages( 
+            cms::MessageProducer& producer,
+            unsigned int count );
+
+        virtual void waitForMessages( unsigned int count );
+
+        virtual void onException( const cms::CMSException& error );
+        virtual void onMessage( const cms::Message* message );
+
+        virtual cms::Connection* createDetachedConnection(
+            const std::string& username,
+            const std::string& password,
+            const std::string& clientId );
+
+    public:
+        
+        std::string brokerUrl;
+        cms::ConnectionFactory* connectionFactory;
+        cms::Connection* connection;
+        cms::Session* session;
+
+        unsigned int numReceived;
+        activemq::concurrent::Mutex mutex;
+        cms::Session::AcknowledgeMode ackMode;
+
+    };
+
+}
+
+#endif /*_INTEGRATION_TESTSUPPORT_H_*/

Added: activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/stomp/AsyncSenderTest.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/stomp/AsyncSenderTest.cpp?view=auto&rev=516785
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/stomp/AsyncSenderTest.cpp (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/stomp/AsyncSenderTest.cpp Sat Mar 10 13:21:22 2007
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+#include "AsyncSender.h"
+
+#include <activemq/concurrent/Thread.h>
+#include <activemq/connector/stomp/StompConnector.h>
+#include <activemq/util/SimpleProperties.h>
+#include <activemq/transport/TransportFactory.h>
+#include <activemq/util/Guid.h>
+#include <activemq/util/SimpleProperties.h>
+#include <activemq/util/StringTokenizer.h>
+#include <activemq/connector/ConnectorFactoryMap.h>
+#include <activemq/network/SocketFactory.h>
+#include <activemq/transport/TransportFactory.h>
+#include <activemq/network/Socket.h>
+#include <activemq/exceptions/NullPointerException.h>
+#include <activemq/core/ActiveMQConnection.h>
+#include <activemq/core/ActiveMQConsumer.h>
+#include <activemq/core/ActiveMQProducer.h>
+#include <activemq/util/StringTokenizer.h>
+#include <activemq/util/Boolean.h>
+
+#include <cms/Connection.h>
+#include <cms/MessageConsumer.h>
+#include <cms/MessageProducer.h>
+#include <cms/MessageListener.h>
+#include <cms/Startable.h>
+#include <cms/Closeable.h>
+#include <cms/MessageListener.h>
+#include <cms/ExceptionListener.h>
+#include <cms/Topic.h>
+#include <cms/Queue.h>
+#include <cms/TemporaryTopic.h>
+#include <cms/TemporaryQueue.h>
+#include <cms/Session.h>
+#include <cms/BytesMessage.h>
+#include <cms/TextMessage.h>
+#include <cms/MapMessage.h>
+
+using namespace activemq::connector::stomp;
+using namespace activemq::transport;
+using namespace activemq::util;
+using namespace std;
+using namespace cms;
+using namespace activemq;
+using namespace activemq::core;
+using namespace activemq::util;
+using namespace activemq::connector;
+using namespace activemq::exceptions;
+using namespace activemq::network;
+using namespace activemq::transport;
+using namespace activemq::concurrent;
+
+using namespace integration;
+using namespace integration::connector::stomp;
+
+CPPUNIT_TEST_SUITE_REGISTRATION( integration::connector::stomp::AsyncSenderTest );
+
+////////////////////////////////////////////////////////////////////////////////
+AsyncSenderTest::AsyncSenderTest()
+:
+    testSupport("stomp://localhost:61613?useAsyncSend=true")
+{
+    testSupport.initialize();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+AsyncSenderTest::~AsyncSenderTest()
+{
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void AsyncSenderTest::test()
+{
+    try
+    {
+        if( IntegrationCommon::debug ) {
+            cout << "Starting activemqcms test (sending "
+                 << IntegrationCommon::defaultMsgCount
+                 << " messages per type and sleeping "
+                 << IntegrationCommon::defaultDelay 
+                 << " milli-seconds) ...\n"
+                 << endl;
+        }
+        
+        // Create CMS Object for Comms
+        cms::Session* session = testSupport.getSession();
+        cms::Topic* topic = testSupport.getSession()->createTopic("mytopic");
+        cms::MessageConsumer* consumer = 
+            session->createConsumer( topic );            
+        consumer->setMessageListener( &testSupport );
+        cms::MessageProducer* producer = 
+            session->createProducer( topic );
+
+        // Send some text messages
+        testSupport.produceTextMessages( 
+            *producer, IntegrationCommon::defaultMsgCount );
+        
+        // Send some bytes messages.
+        testSupport.produceTextMessages( 
+            *producer, IntegrationCommon::defaultMsgCount );
+
+        // Wait for the messages to get here
+        testSupport.waitForMessages( IntegrationCommon::defaultMsgCount * 2 );
+        
+        unsigned int numReceived = testSupport.getNumReceived();
+        if( IntegrationCommon::debug ) {
+            printf("received: %d\n", numReceived );
+        }
+        CPPUNIT_ASSERT( 
+            numReceived == IntegrationCommon::defaultMsgCount * 2 );
+
+        if( IntegrationCommon::debug ) {
+            printf("Shutting Down\n" );
+        }
+        delete producer;                      
+        delete consumer;
+        delete topic;
+    }
+    AMQ_CATCH_RETHROW( ActiveMQException )
+    AMQ_CATCHALL_THROW( ActiveMQException )
+}

Added: activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/stomp/AsyncSenderTest.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/stomp/AsyncSenderTest.h?view=auto&rev=516785
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/stomp/AsyncSenderTest.h (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/stomp/AsyncSenderTest.h Sat Mar 10 13:21:22 2007
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+#ifndef _INTEGRATION_CONNECTOR_STOMP_ASYNCSENDERTEST_H_
+#define _INTEGRATION_CONNECTOR_STOMP_ASYNCSENDERTEST_H_
+
+#include <integration/TestSupport.h>
+
+#include <cppunit/TestFixture.h>
+#include <cppunit/extensions/HelperMacros.h>
+
+namespace integration{
+namespace connector{
+namespace stomp{
+
+    class AsyncSenderTest : public CppUnit::TestFixture {
+        
+        CPPUNIT_TEST_SUITE( AsyncSenderTest );
+        CPPUNIT_TEST( test );
+        CPPUNIT_TEST_SUITE_END();
+
+    private:
+    
+        TestSupport testSupport;
+        
+    public:
+    
+        AsyncSenderTest();
+        virtual ~AsyncSenderTest();
+        
+        virtual std::string getBrokerURL() const {
+            return common::IntegrationCommon::defaultURL + "?useAsyncSend=true";
+        }
+
+        virtual void test();
+
+    };
+
+}}}
+
+#endif /*_INTEGRATION_CONNECTOR_STOMP_ASYNCSENDERTEST_H_*/

Added: activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/stomp/DurableTest.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/stomp/DurableTest.cpp?view=auto&rev=516785
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/stomp/DurableTest.cpp (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/stomp/DurableTest.cpp Sat Mar 10 13:21:22 2007
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "DurableTester.h"
+
+CPPUNIT_TEST_SUITE_REGISTRATION( integration::connector::stomp::DurableTest );
+
+#include <activemq/concurrent/Thread.h>
+#include <activemq/connector/stomp/StompConnector.h>
+#include <activemq/util/SimpleProperties.h>
+#include <activemq/transport/TransportFactory.h>
+#include <activemq/util/Guid.h>
+#include <activemq/util/SimpleProperties.h>
+#include <activemq/util/StringTokenizer.h>
+#include <activemq/connector/ConnectorFactoryMap.h>
+#include <activemq/network/SocketFactory.h>
+#include <activemq/transport/TransportFactory.h>
+#include <activemq/network/Socket.h>
+#include <activemq/exceptions/NullPointerException.h>
+#include <activemq/core/ActiveMQConnection.h>
+#include <activemq/core/ActiveMQConsumer.h>
+#include <activemq/core/ActiveMQProducer.h>
+#include <activemq/util/StringTokenizer.h>
+#include <activemq/util/Boolean.h>
+
+#include <cms/Connection.h>
+#include <cms/MessageConsumer.h>
+#include <cms/MessageProducer.h>
+#include <cms/MessageListener.h>
+#include <cms/Startable.h>
+#include <cms/Closeable.h>
+#include <cms/MessageListener.h>
+#include <cms/ExceptionListener.h>
+#include <cms/Topic.h>
+#include <cms/Queue.h>
+#include <cms/TemporaryTopic.h>
+#include <cms/TemporaryQueue.h>
+#include <cms/Session.h>
+#include <cms/BytesMessage.h>
+#include <cms/TextMessage.h>
+#include <cms/MapMessage.h>
+
+using namespace activemq::connector::stomp;
+using namespace activemq::transport;
+using namespace activemq::util;
+using namespace std;
+using namespace cms;
+using namespace activemq;
+using namespace activemq::core;
+using namespace activemq::util;
+using namespace activemq::connector;
+using namespace activemq::exceptions;
+using namespace activemq::network;
+using namespace activemq::transport;
+using namespace activemq::concurrent;
+
+using namespace integration;
+using namespace integration::connector::stomp;
+
+DurableTest::DurableTest()
+:
+    testSupport("stomp://localhost:61613")
+{
+    testSupport.initialize();
+}
+
+DurableTest::~DurableTest()
+{}
+
+void DurableTest::test()
+{
+    try
+    {
+        if( IntegrationCommon::debug ) {
+            cout << "Starting activemqcms durable test (sending "
+                 << IntegrationCommon::defaultMsgCount
+                 << " messages per type and sleeping "
+                 << IntegrationCommon::defaultDelay 
+                 << " milli-seconds) ...\n"
+                 << endl;
+        }
+        
+        std::string subName = Guid().createGUID();
+
+        // Create CMS Object for Comms
+        cms::Session* session = testSupport.getSession();
+        cms::Topic* topic = session->createTopic("mytopic");
+        cms::MessageConsumer* consumer = 
+            session->createDurableConsumer( topic, subName, "" );            
+        consumer->setMessageListener( &testSupport );
+        cms::MessageProducer* producer = 
+            session->createProducer( topic );
+
+        unsigned int sent;
+
+        // Send some text messages
+        sent = testSupport.produceTextMessages( *producer, 3 );
+        
+        // Wait for all messages
+        testSupport.waitForMessages( sent );
+
+        unsigned int numReceived = testSupport.getNumReceived();
+        
+        if( IntegrationCommon::debug ) {
+            printf("received: %d\n", numReceived );
+        }
+        
+        CPPUNIT_ASSERT( numReceived == sent );
+
+        // Nuke the consumer
+        delete consumer;
+
+        // Send some text messages
+        sent += testSupport.produceTextMessages( *producer, 3 );
+
+        consumer = session->createDurableConsumer( topic, subName, "" );            
+        consumer->setMessageListener( &testSupport );
+
+        // Send some text messages
+        sent += testSupport.produceTextMessages( *producer, 3 );
+
+        // Wait for all remaining messages
+        testSupport.waitForMessages( sent );
+        
+        numReceived = testSupport.getNumReceived();
+        if( IntegrationCommon::debug ) {
+            printf("received: %d\n", numReceived );
+        }
+        CPPUNIT_ASSERT( numReceived == sent );
+
+        if( IntegrationCommon::debug ) {
+            printf("Shutting Down\n" );
+        }
+        delete producer;                      
+        delete consumer;
+        delete topic;
+    }
+    AMQ_CATCH_RETHROW( ActiveMQException )
+    AMQ_CATCHALL_THROW( ActiveMQException )
+}

Added: activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/stomp/DurableTest.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/stomp/DurableTest.h?view=auto&rev=516785
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/stomp/DurableTest.h (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/stomp/DurableTest.h Sat Mar 10 13:21:22 2007
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _INTEGRATION_CONNECTOR_STOMP_DURABLETESTER_H_
+#define _INTEGRATION_CONNECTOR_STOMP_DURABLETESTER_H_
+
+#include <integration/TestSupport.h>
+
+#include <cppunit/TestFixture.h>
+#include <cppunit/extensions/HelperMacros.h>
+
+namespace integration{
+namespace connector{
+namespace stomp{
+
+    class DurableTest : public CppUnit::TestFixture
+    {
+        CPPUNIT_TEST_SUITE( DurableTest );
+        CPPUNIT_TEST( test );
+        CPPUNIT_TEST_SUITE_END();
+
+    private:
+    
+        TestSupport testSupport;
+        
+    public:
+
+    	DurableTest();
+    	virtual ~DurableTest();
+
+        virtual void test();
+        
+    };
+
+}}}
+
+#endif /*_INTEGRATION_CONNECTOR_STOMP_DURABLETESTER_H_*/

Added: activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/stomp/ExpirationTest.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/stomp/ExpirationTest.cpp?view=auto&rev=516785
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/stomp/ExpirationTest.cpp (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/stomp/ExpirationTest.cpp Sat Mar 10 13:21:22 2007
@@ -0,0 +1,350 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "ExpirationTest.h"
+
+
+CPPUNIT_TEST_SUITE_REGISTRATION( integration::connector::stomp::ExpirationTest );
+
+#include <sstream>
+
+#include <activemq/core/ActiveMQConnectionFactory.h>
+#include <activemq/exceptions/ActiveMQException.h>
+#include <activemq/concurrent/Thread.h>
+#include <activemq/connector/stomp/StompConnector.h>
+#include <activemq/util/SimpleProperties.h>
+#include <activemq/transport/TransportFactory.h>
+#include <activemq/util/Guid.h>
+#include <activemq/util/SimpleProperties.h>
+#include <activemq/util/StringTokenizer.h>
+#include <activemq/connector/ConnectorFactoryMap.h>
+#include <activemq/network/SocketFactory.h>
+#include <activemq/transport/TransportFactory.h>
+#include <activemq/network/Socket.h>
+#include <activemq/exceptions/NullPointerException.h>
+#include <activemq/core/ActiveMQConnection.h>
+#include <activemq/core/ActiveMQConsumer.h>
+#include <activemq/core/ActiveMQProducer.h>
+#include <activemq/util/StringTokenizer.h>
+#include <activemq/util/Boolean.h>
+
+#include <cms/Connection.h>
+#include <cms/MessageConsumer.h>
+#include <cms/MessageProducer.h>
+#include <cms/MessageListener.h>
+#include <cms/Startable.h>
+#include <cms/Closeable.h>
+#include <cms/MessageListener.h>
+#include <cms/ExceptionListener.h>
+#include <cms/Topic.h>
+#include <cms/Queue.h>
+#include <cms/TemporaryTopic.h>
+#include <cms/TemporaryQueue.h>
+#include <cms/Session.h>
+#include <cms/BytesMessage.h>
+#include <cms/TextMessage.h>
+#include <cms/MapMessage.h>
+#include <cms/Session.h>
+
+using namespace activemq::connector::stomp;
+using namespace activemq::transport;
+using namespace activemq::util;
+using namespace std;
+using namespace cms;
+using namespace activemq;
+using namespace activemq::core;
+using namespace activemq::util;
+using namespace activemq::connector;
+using namespace activemq::exceptions;
+using namespace activemq::network;
+using namespace activemq::transport;
+using namespace activemq::concurrent;
+
+using namespace std;
+using namespace integration;
+using namespace integration::connector::stomp;
+
+std::string messageTag = Guid().createGUID();
+
+class Producer : public Runnable {
+private:
+
+    ActiveMQConnectionFactory* connectionFactory;
+    Connection* connection;
+    Session* session;
+    Topic* destination;
+    MessageProducer* producer;
+    int numMessages;
+    long long timeToLive;
+    bool disableTimeStamps;
+
+public:
+
+    Producer( int numMessages, long long timeToLive ){
+        connection = NULL;
+        session = NULL;
+        destination = NULL;
+        producer = NULL;
+        this->numMessages = numMessages;
+        this->timeToLive = timeToLive;
+        this->disableTimeStamps = false;
+    }
+
+    virtual ~Producer(){
+        cleanup();
+    }
+
+    virtual bool getDisableTimeStamps() const {
+        return disableTimeStamps;
+    }
+
+    virtual void setDisableTimeStamps( bool value ) {
+        this->disableTimeStamps = value;
+    }
+
+    virtual void run() {
+        try {
+            // Create a ConnectionFactory
+            connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61613");
+
+            // Create a Connection
+            connection = connectionFactory->createConnection();
+            connection->start();
+
+            string sss=connection->getClientId();
+            cout << sss << endl;
+
+            session = connection->createSession( Session::AUTO_ACKNOWLEDGE);
+            destination = session->createTopic( "expirationTopic" );
+
+            producer = session->createProducer( destination );
+            producer->setDeliveryMode( DeliveryMode::PERSISTENT );
+            producer->setDisableMessageTimeStamp( disableTimeStamps );
+
+            //unsigned long ttt=getcurt();
+            producer->setTimeToLive( 1);
+
+            // Create the Thread Id String
+            string threadIdStr = Integer::toString( Thread::getId() );
+
+            // Create a messages
+            string text = (string)"Hello world! from thread " + threadIdStr;
+
+            for( int ix=0; ix<numMessages; ++ix ){
+                TextMessage* message = session->createTextMessage( text );
+                message->setStringProperty( "messageTag", messageTag );
+                producer->send( message );
+                delete message;
+           }
+
+       }catch ( CMSException& e ) {
+           e.printStackTrace();
+       }
+   }
+
+private:
+
+    void cleanup(){
+
+        // Destroy resources.
+        try{
+            if( destination != NULL ) delete destination;
+        }catch ( CMSException& e ) {}
+        destination = NULL;
+
+        try{
+            if( producer != NULL ) delete producer;
+        }catch ( CMSException& e ) {}
+        producer = NULL;
+
+        // Close open resources.
+        try{
+            if( session != NULL ) session->close();
+            if( connection != NULL ) connection->close();
+        }catch ( CMSException& e ) {}
+
+        try{
+            if( session != NULL ) delete session;
+        }catch ( CMSException& e ) {}
+        session = NULL;
+
+        try{
+            if( connection != NULL ) delete connection;
+        }catch ( CMSException& e ) {}
+        connection = NULL;
+
+        try{
+            if( connectionFactory != NULL ) delete connectionFactory;
+        }catch ( CMSException& e ) {}
+        connectionFactory = NULL;
+    }
+};
+
+class Consumer : public MessageListener, public Runnable {
+
+private:
+
+    Connection* connection;
+    Session* session;
+    Topic* destination;
+    MessageConsumer* consumer;
+    long waitMillis;
+    int numReceived;
+
+public:
+
+    Consumer( long waitMillis ){
+        connection = NULL;
+        session = NULL;
+        destination = NULL;
+        consumer = NULL;
+        this->waitMillis = waitMillis;
+        numReceived = 0;
+    }
+    
+    virtual ~Consumer(){
+        cleanup();
+    }
+
+    virtual int getNumReceived() const{
+        return numReceived;
+    }
+    
+    virtual void run() {
+
+        try {
+
+            string user,passwd,sID;
+            user="default";
+            passwd="";
+            sID="lsgID";
+            
+            // Create a ConnectionFactory
+            ActiveMQConnectionFactory* connectionFactory =
+               new ActiveMQConnectionFactory("tcp://localhost:61613",user,passwd,sID);
+
+            // Create a Connection
+            connection = connectionFactory->createConnection();
+            delete connectionFactory;
+            connection->start();
+
+            // Create a Session
+            session = connection->createSession( Session::AUTO_ACKNOWLEDGE);
+
+            // Create the destination (Topic or Queue)
+            destination = session->createTopic( "expirationTopic?consumer.retroactive=true");
+
+            consumer = session->createConsumer( destination );
+
+            consumer->setMessageListener( this );
+
+            // Sleep while asynchronous messages come in.
+            Thread::sleep( waitMillis );
+
+        } catch (CMSException& e) {
+            e.printStackTrace();
+        }
+    }
+
+    virtual void onMessage( const Message* message ){
+
+        try
+        {
+            if( !message->propertyExists( "messageTag" ) || 
+                message->getStringProperty("messageTag") != messageTag ){
+                return;
+            }
+            
+            const TextMessage* textMessage =
+                dynamic_cast< const TextMessage* >( message );
+            string text = textMessage->getText();
+            numReceived++;
+        } catch (CMSException& e) {
+            e.printStackTrace();
+        }
+    }
+
+private:
+
+    void cleanup(){
+
+        // Destroy resources.
+        try{
+            if( destination != NULL ) delete destination;
+        }catch (CMSException& e) {}
+        destination = NULL;
+
+        try{
+            if( consumer != NULL ) delete consumer;
+        }catch (CMSException& e) {}
+        consumer = NULL;
+
+        // Close open resources.
+        try{
+            if( session != NULL ) session->close();
+            if( connection != NULL ) connection->close();
+        }catch (CMSException& e) {}
+
+        try{
+            if( session != NULL ) delete session;
+        }catch (CMSException& e) {}
+        session = NULL;
+
+        try{
+            if( connection != NULL ) delete connection;
+        }catch (CMSException& e) {}
+        connection = NULL;
+    }
+};
+
+void ExpirationTest::testExpired()
+{
+    Producer producer( 1, 1 );
+    Thread producerThread( &producer );
+    producerThread.start();
+    producerThread.join();
+    
+    Thread::sleep( 100 );
+
+    Consumer consumer( 2000 );
+    Thread consumerThread( &consumer );
+    consumerThread.start();
+    consumerThread.join();
+    
+    Thread::sleep( 100 );
+
+    CPPUNIT_ASSERT( consumer.getNumReceived() == 0 );
+}
+
+void ExpirationTest::testNotExpired()
+{
+    Producer producer( 2, 2000 );
+    producer.setDisableTimeStamps( true );
+    Thread producerThread( &producer );
+    producerThread.start();
+    producerThread.join();
+    
+    Consumer consumer( 3000 );
+    Thread consumerThread( &consumer );
+    consumerThread.start();
+    consumerThread.join();
+
+    Thread::sleep( 50 );
+    
+    CPPUNIT_ASSERT( consumer.getNumReceived() == 2 );
+}
+

Added: activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/stomp/ExpirationTest.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/stomp/ExpirationTest.h?view=auto&rev=516785
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/stomp/ExpirationTest.h (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/stomp/ExpirationTest.h Sat Mar 10 13:21:22 2007
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _INTEGRATION_CONNECTOR_STOMP_EXPIRATIONTEST_H_
+#define _INTEGRATION_CONNECTOR_STOMP_EXPIRATIONTEST_H_
+
+#include <cppunit/TestFixture.h>
+#include <cppunit/extensions/HelperMacros.h>
+
+#include <cms/MessageListener.h>
+#include <cms/ExceptionListener.h>
+#include <cms/ConnectionFactory.h>
+#include <cms/Connection.h>
+#include <cms/Session.h>
+#include <cms/MessageProducer.h>
+
+namespace integration{
+namespace expiration{
+
+    class ExpirationTest : public CppUnit::TestFixture   
+    {
+        CPPUNIT_TEST_SUITE( ExpirationTest );
+        CPPUNIT_TEST( testExpired );
+        CPPUNIT_TEST( testNotExpired );
+        CPPUNIT_TEST_SUITE_END();
+
+    public:
+    
+        ExpirationTest(){}
+        virtual ~ExpirationTest(){}
+        
+        virtual void testExpired();
+        virtual void testNotExpired();
+    };
+
+}}
+
+#endif /*_INTEGRATION_CONNECTOR_STOMP_EXPIRATIONTEST_H_*/

Added: activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/stomp/SimpleRollbackTest.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/stomp/SimpleRollbackTest.cpp?view=auto&rev=516785
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/stomp/SimpleRollbackTest.cpp (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/stomp/SimpleRollbackTest.cpp Sat Mar 10 13:21:22 2007
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "SimpleRollbackTest.h"
+
+#include <integration/common/IntegrationCommon.h>
+
+CPPUNIT_TEST_SUITE_REGISTRATION( integration::connector::stomp::SimpleRollbackTest );
+
+#include <sstream>
+
+#include <activemq/core/ActiveMQConnectionFactory.h>
+#include <activemq/exceptions/ActiveMQException.h>
+#include <activemq/concurrent/Thread.h>
+#include <activemq/connector/stomp/StompConnector.h>
+#include <activemq/util/SimpleProperties.h>
+#include <activemq/transport/TransportFactory.h>
+#include <activemq/util/Guid.h>
+#include <activemq/util/SimpleProperties.h>
+#include <activemq/util/StringTokenizer.h>
+#include <activemq/connector/ConnectorFactoryMap.h>
+#include <activemq/network/SocketFactory.h>
+#include <activemq/transport/TransportFactory.h>
+#include <activemq/network/Socket.h>
+#include <activemq/exceptions/NullPointerException.h>
+#include <activemq/core/ActiveMQConnection.h>
+#include <activemq/core/ActiveMQConsumer.h>
+#include <activemq/core/ActiveMQProducer.h>
+#include <activemq/util/StringTokenizer.h>
+#include <activemq/util/Boolean.h>
+#include <activemq/util/Config.h>
+
+#include <cms/Connection.h>
+#include <cms/MessageConsumer.h>
+#include <cms/MessageProducer.h>
+#include <cms/MessageListener.h>
+#include <cms/Startable.h>
+#include <cms/Closeable.h>
+#include <cms/MessageListener.h>
+#include <cms/ExceptionListener.h>
+#include <cms/Topic.h>
+#include <cms/Queue.h>
+#include <cms/TemporaryTopic.h>
+#include <cms/TemporaryQueue.h>
+#include <cms/Session.h>
+#include <cms/BytesMessage.h>
+#include <cms/TextMessage.h>
+#include <cms/MapMessage.h>
+#include <cms/Session.h>
+
+using namespace activemq::connector::stomp;
+using namespace activemq::transport;
+using namespace activemq::util;
+using namespace std;
+using namespace cms;
+using namespace activemq;
+using namespace activemq::core;
+using namespace activemq::util;
+using namespace activemq::connector;
+using namespace activemq::exceptions;
+using namespace activemq::network;
+using namespace activemq::transport;
+using namespace activemq::concurrent;
+
+using namespace std;
+using namespace integration;
+using namespace integration::connector::stomp;
+
+SimpleRollbackTest::SimpleRollbackTest()
+{
+    try
+    {
+        string url = "stomp://localhost:61613";
+        numReceived = 0;
+
+        // Default amount to send and receive
+        msgCount = 1;
+    
+        // Create a Factory
+        connectionFactory = new ActiveMQConnectionFactory( url );
+
+        // Now create the connection
+        connection = connectionFactory->createConnection(
+            "", "", Guid().createGUIDString() );
+    
+        // Set ourself as a recipient of Exceptions        
+        connection->setExceptionListener( this );
+        connection->start();
+        
+        // Create a Session
+        session = connection->createSession( 
+            cms::Session::SESSION_TRANSACTED );
+    }
+    AMQ_CATCH_RETHROW( ActiveMQException )
+    AMQ_CATCHALL_THROW( ActiveMQException )
+}
+
+SimpleRollbackTest::~SimpleRollbackTest()
+{
+    try
+    {
+        session->close();
+        connection->close();
+
+        delete session;
+        delete connection;
+        delete connectionFactory;
+    }
+    AMQ_CATCH_NOTHROW( ActiveMQException )
+    AMQ_CATCHALL_NOTHROW( )
+}
+
+void SimpleRollbackTest::test()
+{
+    try
+    {
+        // Create CMS Object for Comms
+        cms::Topic* topic = session->createTopic("mytopic");
+        cms::MessageConsumer* consumer = 
+            session->createConsumer( topic );            
+        consumer->setMessageListener( this );
+        cms::MessageProducer* producer = 
+            session->createProducer( topic );
+
+        cms::TextMessage* textMsg = 
+            session->createTextMessage();
+
+        for( size_t ix = 0; ix < msgCount; ++ix )
+        {
+            ostringstream lcStream;
+            lcStream << "SimpleTest - Message #" << ix << ends;            
+            textMsg->setText( lcStream.str() );
+            producer->send( textMsg );
+        }
+        
+        delete textMsg;
+
+        Thread::sleep( 100 );
+
+        session->commit();
+
+        textMsg = session->createTextMessage();
+
+        for( size_t ix = 0; ix < msgCount; ++ix )
+        {
+            ostringstream lcStream;
+            lcStream << "SimpleTest - Message #" << ix << ends;            
+            textMsg->setText( lcStream.str() );
+            producer->send( textMsg );
+        }
+        
+        delete textMsg;
+
+        Thread::sleep( 500 );
+
+        session->rollback();
+
+        Thread::sleep( 500 );
+
+        textMsg = session->createTextMessage();
+        textMsg->setText( "SimpleTest - Message after Rollback" );
+        producer->send( textMsg );
+        delete textMsg;
+
+        Thread::sleep( 15000 );
+
+        CPPUNIT_ASSERT( true );
+
+        textMsg = session->createTextMessage();
+        textMsg->setText( "SimpleTest - Message after Rollback" );
+        producer->send( textMsg );
+        delete textMsg;
+
+        if( IntegrationCommon::debug ) {
+            printf( "Shutting Down\n" );
+        }
+
+        delete producer;                      
+        delete consumer;
+        delete topic;
+    }
+    AMQ_CATCH_RETHROW( ActiveMQException )
+    AMQ_CATCHALL_THROW( ActiveMQException )
+}
+
+void SimpleRollbackTest::onException( const cms::CMSException& error AMQCPP_UNUSED)
+{
+    bool AbstractTester = false;
+    CPPUNIT_ASSERT( AbstractTester );
+}
+
+void SimpleRollbackTest::onMessage( const cms::Message* message )
+{
+    try
+    {
+        // Got a text message.
+        const cms::TextMessage* txtMsg = 
+            dynamic_cast<const cms::TextMessage*>(message);
+            
+        if( txtMsg != NULL )
+        {
+            if( IntegrationCommon::debug ) {
+                printf("received text msg: %s\n", txtMsg->getText().c_str() );
+            }
+    
+            numReceived++;
+    
+            // Signal that we got one
+            synchronized( &mutex )
+            {
+                mutex.notifyAll();
+            }
+    
+            return;
+        }
+        
+        // Got a bytes msg.
+        const cms::BytesMessage* bytesMsg = 
+            dynamic_cast<const cms::BytesMessage*>(message);
+    
+        if( bytesMsg != NULL )
+        {
+            const unsigned char* bytes = bytesMsg->getBodyBytes();
+            
+            string transcode( (const char*)bytes, bytesMsg->getBodyLength() );
+
+            if( IntegrationCommon::debug ) {
+                printf("Received Bytes Message: %s", transcode.c_str() );
+            }
+    
+            numReceived++;
+            
+            // Signal that we got one
+            synchronized( &mutex )
+            {
+                mutex.notifyAll();
+            }
+    
+            return;
+        }
+    }
+    AMQ_CATCH_NOTHROW( ActiveMQException )
+    AMQ_CATCHALL_NOTHROW( )
+}

Added: activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/stomp/SimpleRollbackTest.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/stomp/SimpleRollbackTest.h?view=auto&rev=516785
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/stomp/SimpleRollbackTest.h (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/stomp/SimpleRollbackTest.h Sat Mar 10 13:21:22 2007
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _INTEGRATION_CONNECTOR_STOMP_SIMPLEROLLBACKTEST_H_
+#define _INTEGRATION_CONNECTOR_STOMP_SIMPLEROLLBACKTEST_H_
+
+#include <cppunit/TestFixture.h>
+#include <cppunit/extensions/HelperMacros.h>
+
+#include <activemq/concurrent/Mutex.h>
+
+#include <cms/MessageListener.h>
+#include <cms/ExceptionListener.h>
+#include <cms/ConnectionFactory.h>
+#include <cms/Connection.h>
+#include <cms/Session.h>
+#include <cms/MessageProducer.h>
+
+namespace integration{
+namespace connector{
+namespace stomp{
+
+    class SimpleRollbackTest : public CppUnit::TestFixture,
+                               public cms::ExceptionListener,
+                               public cms::MessageListener    
+    {
+        CPPUNIT_TEST_SUITE( SimpleRollbackTest );
+        CPPUNIT_TEST( test );
+        CPPUNIT_TEST_SUITE_END();
+
+    public:
+    
+        SimpleRollbackTest();
+        virtual ~SimpleRollbackTest();
+        
+        virtual void test(void);
+        
+        virtual void onException( const cms::CMSException& error );
+        virtual void onMessage( const cms::Message* message );
+
+    private:
+
+        cms::ConnectionFactory* connectionFactory;
+        cms::Connection* connection;
+        cms::Session* session;
+
+        unsigned int numReceived;
+        unsigned int msgCount;
+        activemq::concurrent::Mutex mutex;
+
+    };
+
+}}}
+
+#endif /*_INTEGRATION_CONNECTOR_STOMP_SIMPLEROLLBACKTEST_H_*/

Added: activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/stomp/SimpleTest.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/stomp/SimpleTest.cpp?view=auto&rev=516785
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/stomp/SimpleTest.cpp (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/stomp/SimpleTest.cpp Sat Mar 10 13:21:22 2007
@@ -0,0 +1,136 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "SimpleTester.h"
+#include <integration/common/IntegrationCommon.h>
+
+CPPUNIT_TEST_SUITE_REGISTRATION( integration::connector::stomp::SimpleTester );
+
+#include <activemq/concurrent/Thread.h>
+#include <activemq/connector/stomp/StompConnector.h>
+#include <activemq/util/SimpleProperties.h>
+#include <activemq/transport/TransportFactory.h>
+#include <activemq/util/Guid.h>
+#include <activemq/util/SimpleProperties.h>
+#include <activemq/util/StringTokenizer.h>
+#include <activemq/connector/ConnectorFactoryMap.h>
+#include <activemq/network/SocketFactory.h>
+#include <activemq/transport/TransportFactory.h>
+#include <activemq/network/Socket.h>
+#include <activemq/exceptions/NullPointerException.h>
+#include <activemq/core/ActiveMQConnection.h>
+#include <activemq/core/ActiveMQConsumer.h>
+#include <activemq/core/ActiveMQProducer.h>
+#include <activemq/util/StringTokenizer.h>
+#include <activemq/util/Boolean.h>
+
+#include <cms/Connection.h>
+#include <cms/MessageConsumer.h>
+#include <cms/MessageProducer.h>
+#include <cms/MessageListener.h>
+#include <cms/Startable.h>
+#include <cms/Closeable.h>
+#include <cms/MessageListener.h>
+#include <cms/ExceptionListener.h>
+#include <cms/Topic.h>
+#include <cms/Queue.h>
+#include <cms/TemporaryTopic.h>
+#include <cms/TemporaryQueue.h>
+#include <cms/Session.h>
+#include <cms/BytesMessage.h>
+#include <cms/TextMessage.h>
+#include <cms/MapMessage.h>
+
+using namespace activemq::connector::stomp;
+using namespace activemq::transport;
+using namespace activemq::util;
+using namespace std;
+using namespace cms;
+using namespace activemq;
+using namespace activemq::core;
+using namespace activemq::util;
+using namespace activemq::connector;
+using namespace activemq::exceptions;
+using namespace activemq::network;
+using namespace activemq::transport;
+using namespace activemq::concurrent;
+
+using namespace integration;
+using namespace integration::connector::stomp;
+
+SimpleTester::SimpleTester()
+:
+    testSupport( "stomp://localhost:61613" )
+{
+    testSupport.initialize();
+}
+
+SimpleTester::~SimpleTester()
+{
+}
+
+void SimpleTester::test()
+{
+    try
+    {
+        if( IntegrationCommon::debug ) {
+            cout << "Starting activemqcms test (sending "
+                 << IntegrationCommon::defaultMsgCount
+                 << " messages per type and sleeping "
+                 << IntegrationCommon::defaultDelay 
+                 << " milli-seconds) ...\n"
+                 << endl;
+        }
+        
+        // Create CMS Object for Comms
+        cms::Session* session = testSupport.getSession();
+        cms::Topic* topic = session->createTopic("mytopic");
+        cms::MessageConsumer* consumer = 
+            session->createConsumer( topic );            
+        consumer->setMessageListener( &testSupport );
+        cms::MessageProducer* producer = 
+            session->createProducer( topic );
+
+        // Send some text messages
+        testSupport.produceTextMessages( 
+            *producer, IntegrationCommon::defaultMsgCount );
+        
+        // Send some bytes messages.
+        testSupport.produceTextMessages( 
+            *producer, IntegrationCommon::defaultMsgCount );
+
+        // Wait for the messages to get here
+        testSupport.waitForMessages( IntegrationCommon::defaultMsgCount * 2 );
+        
+        unsigned int numReceived = testSupport.getNumReceived();
+        if( IntegrationCommon::debug ) {
+            printf("received: %d\n", numReceived );
+        }
+        CPPUNIT_ASSERT( 
+            numReceived == IntegrationCommon::defaultMsgCount * 2 );
+
+        if( IntegrationCommon::debug ) {
+            printf("Shutting Down\n" );
+        }
+        delete producer;                      
+        delete consumer;
+        delete topic;
+    }
+    AMQ_CATCH_RETHROW( ActiveMQException )
+    AMQ_CATCHALL_THROW( ActiveMQException )
+}
+

Added: activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/stomp/SimpleTest.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/stomp/SimpleTest.h?view=auto&rev=516785
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/stomp/SimpleTest.h (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/stomp/SimpleTest.h Sat Mar 10 13:21:22 2007
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _INTEGRATION_CONNECTOR_STOMP_SIMPLETESTER_H_
+#define _INTEGRATION_CONNECTOR_STOMP_SIMPLETESTER_H_
+
+#include <cppunit/TestFixture.h>
+#include <cppunit/extensions/HelperMacros.h>
+
+#include <integration/TestSupport.h>
+
+namespace integration{
+namespace connector{
+namespace stomp{
+    
+    class SimpleTest : public CppUnit::TestFixture
+    {
+        CPPUNIT_TEST_SUITE( SimpleTest );
+        CPPUNIT_TEST( test );
+        CPPUNIT_TEST_SUITE_END();
+
+    private:
+    
+        TestSupport testSupport;
+        
+    public:
+
+    	SimpleTest();
+    	virtual ~SimpleTest();
+
+        virtual void test(void);
+
+    };
+
+}}}
+
+#endif /*_INTEGRATION_CONNECTOR_STOMP_SIMPLETESTER_H_*/

Added: activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/stomp/TransactionTest.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/stomp/TransactionTest.cpp?view=auto&rev=516785
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/stomp/TransactionTest.cpp (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/stomp/TransactionTest.cpp Sat Mar 10 13:21:22 2007
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "TransactionTest.h"
+#include <integration/common/IntegrationCommon.h>
+
+CPPUNIT_TEST_SUITE_REGISTRATION( integration::connector::stomp::TransactionTest );
+
+#include <activemq/concurrent/Thread.h>
+#include <activemq/connector/stomp/StompConnector.h>
+#include <activemq/util/SimpleProperties.h>
+#include <activemq/transport/TransportFactory.h>
+#include <activemq/util/Guid.h>
+#include <activemq/util/SimpleProperties.h>
+#include <activemq/util/StringTokenizer.h>
+#include <activemq/connector/ConnectorFactoryMap.h>
+#include <activemq/network/SocketFactory.h>
+#include <activemq/transport/TransportFactory.h>
+#include <activemq/network/Socket.h>
+#include <activemq/exceptions/NullPointerException.h>
+#include <activemq/core/ActiveMQConnection.h>
+#include <activemq/core/ActiveMQConsumer.h>
+#include <activemq/core/ActiveMQProducer.h>
+#include <activemq/util/StringTokenizer.h>
+#include <activemq/util/Boolean.h>
+
+#include <cms/Connection.h>
+#include <cms/MessageConsumer.h>
+#include <cms/MessageProducer.h>
+#include <cms/MessageListener.h>
+#include <cms/Startable.h>
+#include <cms/Closeable.h>
+#include <cms/MessageListener.h>
+#include <cms/ExceptionListener.h>
+#include <cms/Topic.h>
+#include <cms/Queue.h>
+#include <cms/TemporaryTopic.h>
+#include <cms/TemporaryQueue.h>
+#include <cms/Session.h>
+#include <cms/BytesMessage.h>
+#include <cms/TextMessage.h>
+#include <cms/MapMessage.h>
+
+using namespace activemq::connector::stomp;
+using namespace activemq::transport;
+using namespace activemq::util;
+using namespace std;
+using namespace cms;
+using namespace activemq;
+using namespace activemq::core;
+using namespace activemq::util;
+using namespace activemq::connector;
+using namespace activemq::exceptions;
+using namespace activemq::network;
+using namespace activemq::transport;
+using namespace activemq::concurrent;
+
+using namespace integration;
+using namespace integration::connector::stomp;
+
+TransactionTest::TransactionTest()
+:
+    testSupport( "stomp://localhost:61613", cms::Session::SESSION_TRANSACTED )
+{
+    testSupport.initialize();
+}
+
+TransactionTest::~TransactionTest()
+{}
+
+void TransactionTest::test()
+{
+    try
+    {
+        if( IntegrationCommon::debug ) {
+            cout << "Starting activemqcms transactional test (sending "
+                 << IntegrationCommon::defaultMsgCount
+                 << " messages per type and sleeping "
+                 << IntegrationCommon::defaultDelay 
+                << " milli-seconds) ...\n"
+                << endl;
+        }
+        
+        // Create CMS Object for Comms
+        cms::Session* session = testSupport.getSession();
+        cms::Topic* topic = session->createTopic("mytopic");
+        cms::MessageConsumer* consumer = 
+            session->createConsumer( topic );            
+        consumer->setMessageListener( &testSupport );
+        cms::MessageProducer* producer = 
+            session->createProducer( topic );
+
+        // Send some text messages
+        testSupport.produceTextMessages( 
+            *producer, IntegrationCommon::defaultMsgCount );
+            
+        session->commit();
+        
+        // Send some bytes messages.
+        testSupport.produceTextMessages( 
+            *producer, IntegrationCommon::defaultMsgCount );
+        
+        session->commit();
+
+        // Wait till we get all the messages
+        testSupport.waitForMessages( IntegrationCommon::defaultMsgCount * 2 );
+        
+        unsigned int numReceived = testSupport.getNumReceived();
+        if( IntegrationCommon::debug ) {
+            printf("received: %d\n", numReceived );
+        }
+        CPPUNIT_ASSERT( 
+            numReceived == IntegrationCommon::defaultMsgCount * 2 );
+
+        testSupport.setNumReceived( 0 );
+
+        // Send some text messages
+        this->produceTextMessages( 
+            *producer, IntegrationCommon::defaultMsgCount );
+
+        session->rollback();
+
+        // Wait till we get all the messages
+        testSupport.waitForMessages( IntegrationCommon::defaultMsgCount );
+
+        numReceived = testSupport.getNumReceived();
+        if( IntegrationCommon::debug ) {
+            printf("received: %d\n", numReceived );
+        }
+        CPPUNIT_ASSERT( 
+            numReceived == IntegrationCommon::defaultMsgCount );
+
+        if( IntegrationCommon::debug ) {
+            printf("Shutting Down\n" );
+        }
+        delete producer;                      
+        delete consumer;
+        delete topic;
+    }
+    AMQ_CATCH_RETHROW( ActiveMQException )
+    AMQ_CATCHALL_THROW( ActiveMQException )
+}
+

Added: activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/stomp/TransactionTest.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/stomp/TransactionTest.h?view=auto&rev=516785
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/stomp/TransactionTest.h (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/stomp/TransactionTest.h Sat Mar 10 13:21:22 2007
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _INTEGRATION_CONNECTOR_STOMP_TRANSACTIONTESTER_H_
+#define _INTEGRATION_CONNECTOR_STOMP_TRANSACTIONTESTER_H_
+
+#include <integration/TestSupport.h>
+
+#include <cppunit/TestFixture.h>
+#include <cppunit/extensions/HelperMacros.h>
+
+namespace integration{
+namespace connector{
+namespace stomp{
+
+    class TransactionTest : public CppUnit::TestFixture
+    {
+        CPPUNIT_TEST_SUITE( TransactionTest );
+        CPPUNIT_TEST( test );
+        CPPUNIT_TEST_SUITE_END();
+
+    public:
+
+    	TransactionTest();
+    	virtual ~TransactionTest();
+
+        virtual void test();
+        
+    };
+
+}}}
+
+#endif /*_INTEGRATION_CONNECTOR_STOMP_TRANSACTIONTESTER_H_*/



Mime
View raw message