Return-Path: Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: (qmail 15918 invoked from network); 10 Mar 2007 21:21:51 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 10 Mar 2007 21:21:51 -0000 Received: (qmail 78882 invoked by uid 500); 10 Mar 2007 21:21:59 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 78863 invoked by uid 500); 10 Mar 2007 21:21:59 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 78837 invoked by uid 99); 10 Mar 2007 21:21:59 -0000 Received: from herse.apache.org (HELO herse.apache.org) (140.211.11.133) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 10 Mar 2007 13:21:59 -0800 X-ASF-Spam-Status: No, hits=-99.5 required=10.0 tests=ALL_TRUSTED,NO_REAL_NAME X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO eris.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 10 Mar 2007 13:21:46 -0800 Received: by eris.apache.org (Postfix, from userid 65534) id C2FD81A984A; Sat, 10 Mar 2007 13:21:25 -0800 (PST) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r516785 [4/4] - in /activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration: ./ .deps/ integration/ integration/common/ integration/connector/ integration/connector/openwire/ integration/connector/stomp/ integration/durable/ integrati... Date: Sat, 10 Mar 2007 21:21:24 -0000 To: commits@activemq.apache.org From: nmittler@apache.org X-Mailer: svnmailer-1.1.0 Message-Id: <20070310212125.C2FD81A984A@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Added: activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/TestSupport.cpp URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/TestSupport.cpp?view=auto&rev=516785 ============================================================================== --- activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/TestSupport.cpp (added) +++ activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/TestSupport.cpp Sat Mar 10 13:21:22 2007 @@ -0,0 +1,276 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "TestSupport.h" + +#include + +#include + +#include +#include +#include +#include + +#include +#include + +#include + +using namespace std; +using namespace cms; +using namespace activemq; +using namespace activemq::core; +using namespace activemq::util; +using namespace activemq::exceptions; +using namespace activemq::concurrent; + +using namespace integration; + +TestSupport::TestSupport( const string& brokerUrl, cms::Session::AcknowledgeMode ackMode ) + : connectionFactory( NULL ), + connection( NULL ) +{ + this->ackMode = ackMode; + this->brokerUrl = brokerUrl; +} + +TestSupport::~TestSupport() +{ + close(); +} + +void TestSupport::close() { + try + { + if( session != null ) { + session->close(); + delete session; + } + + if( connection != null ) { + connection->close(); + delete connection; + } + + if( connectionFactory != null ) { + delete connectionFactory; + } + } + AMQ_CATCH_NOTHROW( ActiveMQException ) + AMQ_CATCHALL_NOTHROW( ) + + session = null; + connection = null; + connectionFactory = null; +} + +void TestSupport::initialize(){ + try + { + numReceived = 0; + + // Now create the connection + connection = createDetachedConnection( + "", "", Guid().createGUIDString() ); + + // Set ourself as a recipient of Exceptions + connection->setExceptionListener( this ); + connection->start(); + + // Create a Session + session = connection->createSession( ackMode ); + } + AMQ_CATCH_RETHROW( ActiveMQException ) + AMQ_CATCHALL_THROW( ActiveMQException ) +} + +cms::Connection* TestSupport::createDetachedConnection( + const std::string& username, + const std::string& password, + const std::string& clientId ) { + + try + { + + if( connectionFactory == NULL ) { + // Create a Factory + connectionFactory = new ActiveMQConnectionFactory( brokerUrl ); + } + + // Now create the connection + cms::Connection* connection = connectionFactory->createConnection( + username, password, clientId ); + + return connection; + } + AMQ_CATCH_RETHROW( ActiveMQException ) + AMQ_CATCHALL_THROW( ActiveMQException ) +} + +void TestSupport::doSleep(void) +{ + Thread::sleep( IntegrationCommon::defaultDelay ); +} + +unsigned int TestSupport::produceTextMessages( + cms::MessageProducer& producer, + unsigned int count ) +{ + try + { + // Send some text messages. + ostringstream stream; + string text = "this is a test text message: id = "; + + cms::TextMessage* textMsg = + session->createTextMessage(); + + unsigned int realCount = 0; + + for( unsigned int ix=0; ixsetText( stream.str().c_str() ); + stream.str(""); + producer.send( textMsg ); + doSleep(); + ++realCount; + } + + delete textMsg; + + return realCount; + } + AMQ_CATCH_RETHROW( ActiveMQException ) + AMQ_CATCHALL_THROW( ActiveMQException ) +} + +unsigned int TestSupport::produceBytesMessages( + cms::MessageProducer& producer, + unsigned int count ) +{ + try + { + unsigned char buf[10]; + memset( buf, 0, 10 ); + buf[0] = 0; + buf[1] = 1; + buf[2] = 2; + buf[3] = 3; + buf[4] = 0; + buf[5] = 4; + buf[6] = 5; + buf[7] = 6; + + cms::BytesMessage* bytesMsg = + session->createBytesMessage(); + bytesMsg->setBodyBytes( buf, 10 ); + + unsigned int realCount = 0; + for( unsigned int ix=0; ix(message); + + if( txtMsg != NULL ) + { + std::string text = txtMsg->getText(); + + if( IntegrationCommon::debug ) { + printf("received text msg: %s\n", txtMsg->getText().c_str() ); + } + + numReceived++; + + // Signal that we got one + synchronized( &mutex ) + { + mutex.notifyAll(); + } + + return; + } + + // Got a bytes msg. + const cms::BytesMessage* bytesMsg = + dynamic_cast(message); + + if( bytesMsg != NULL ) + { + const unsigned char* bytes = bytesMsg->getBodyBytes(); + + string transcode( (const char*)bytes, bytesMsg->getBodyLength() ); + + if( IntegrationCommon::debug ) { + printf("received bytes msg: %s\n", transcode.c_str() ); + } + + numReceived++; + + // Signal that we got one + synchronized( &mutex ) + { + mutex.notifyAll(); + } + + return; + } +} Added: activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/TestSupport.h URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/TestSupport.h?view=auto&rev=516785 ============================================================================== --- activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/TestSupport.h (added) +++ activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/TestSupport.h Sat Mar 10 13:21:22 2007 @@ -0,0 +1,101 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef _INTEGRATION_TESTSUPPORT_H_ +#define _INTEGRATION_TESTSUPPORT_H_ + +#include + +#include +#include +#include +#include +#include +#include + +namespace integration{ + + class TestSupport : public cms::ExceptionListener, public cms::MessageListener + { + public: + + TestSupport( const std::string& brokerUrl, + cms::Session::AcknowledgeMode ackMode = cms::Session::AUTO_ACKNOWLEDGE ); + virtual ~TestSupport(); + + virtual void initialize(); + virtual void close(); + + virtual activemq::concurrent::Mutex& getMutex() { + return mutex; + } + + virtual std::string getBrokerUrl() const { + return brokerUrl; + } + + virtual cms::Connection* getConnection() { + return connection; + } + + virtual cms::Session* getSession() { + return session; + } + + virtual unsigned int getNumReceived() const { + return numReceived; + } + + virtual void setNumReceived( unsigned int numReceived ) { + this->numReceived = numReceived; + } + + virtual void doSleep(); + + virtual unsigned int produceTextMessages( + cms::MessageProducer& producer, + unsigned int count ); + virtual unsigned int produceBytesMessages( + cms::MessageProducer& producer, + unsigned int count ); + + virtual void waitForMessages( unsigned int count ); + + virtual void onException( const cms::CMSException& error ); + virtual void onMessage( const cms::Message* message ); + + virtual cms::Connection* createDetachedConnection( + const std::string& username, + const std::string& password, + const std::string& clientId ); + + public: + + std::string brokerUrl; + cms::ConnectionFactory* connectionFactory; + cms::Connection* connection; + cms::Session* session; + + unsigned int numReceived; + activemq::concurrent::Mutex mutex; + cms::Session::AcknowledgeMode ackMode; + + }; + +} + +#endif /*_INTEGRATION_TESTSUPPORT_H_*/ Added: activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/stomp/AsyncSenderTest.cpp URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/stomp/AsyncSenderTest.cpp?view=auto&rev=516785 ============================================================================== --- activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/stomp/AsyncSenderTest.cpp (added) +++ activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/stomp/AsyncSenderTest.cpp Sat Mar 10 13:21:22 2007 @@ -0,0 +1,137 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "AsyncSender.h" + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +using namespace activemq::connector::stomp; +using namespace activemq::transport; +using namespace activemq::util; +using namespace std; +using namespace cms; +using namespace activemq; +using namespace activemq::core; +using namespace activemq::util; +using namespace activemq::connector; +using namespace activemq::exceptions; +using namespace activemq::network; +using namespace activemq::transport; +using namespace activemq::concurrent; + +using namespace integration; +using namespace integration::connector::stomp; + +CPPUNIT_TEST_SUITE_REGISTRATION( integration::connector::stomp::AsyncSenderTest ); + +//////////////////////////////////////////////////////////////////////////////// +AsyncSenderTest::AsyncSenderTest() +: + testSupport("stomp://localhost:61613?useAsyncSend=true") +{ + testSupport.initialize(); +} + +//////////////////////////////////////////////////////////////////////////////// +AsyncSenderTest::~AsyncSenderTest() +{ +} + +//////////////////////////////////////////////////////////////////////////////// +void AsyncSenderTest::test() +{ + try + { + if( IntegrationCommon::debug ) { + cout << "Starting activemqcms test (sending " + << IntegrationCommon::defaultMsgCount + << " messages per type and sleeping " + << IntegrationCommon::defaultDelay + << " milli-seconds) ...\n" + << endl; + } + + // Create CMS Object for Comms + cms::Session* session = testSupport.getSession(); + cms::Topic* topic = testSupport.getSession()->createTopic("mytopic"); + cms::MessageConsumer* consumer = + session->createConsumer( topic ); + consumer->setMessageListener( &testSupport ); + cms::MessageProducer* producer = + session->createProducer( topic ); + + // Send some text messages + testSupport.produceTextMessages( + *producer, IntegrationCommon::defaultMsgCount ); + + // Send some bytes messages. + testSupport.produceTextMessages( + *producer, IntegrationCommon::defaultMsgCount ); + + // Wait for the messages to get here + testSupport.waitForMessages( IntegrationCommon::defaultMsgCount * 2 ); + + unsigned int numReceived = testSupport.getNumReceived(); + if( IntegrationCommon::debug ) { + printf("received: %d\n", numReceived ); + } + CPPUNIT_ASSERT( + numReceived == IntegrationCommon::defaultMsgCount * 2 ); + + if( IntegrationCommon::debug ) { + printf("Shutting Down\n" ); + } + delete producer; + delete consumer; + delete topic; + } + AMQ_CATCH_RETHROW( ActiveMQException ) + AMQ_CATCHALL_THROW( ActiveMQException ) +} Added: activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/stomp/AsyncSenderTest.h URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/stomp/AsyncSenderTest.h?view=auto&rev=516785 ============================================================================== --- activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/stomp/AsyncSenderTest.h (added) +++ activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/stomp/AsyncSenderTest.h Sat Mar 10 13:21:22 2007 @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef _INTEGRATION_CONNECTOR_STOMP_ASYNCSENDERTEST_H_ +#define _INTEGRATION_CONNECTOR_STOMP_ASYNCSENDERTEST_H_ + +#include + +#include +#include + +namespace integration{ +namespace connector{ +namespace stomp{ + + class AsyncSenderTest : public CppUnit::TestFixture { + + CPPUNIT_TEST_SUITE( AsyncSenderTest ); + CPPUNIT_TEST( test ); + CPPUNIT_TEST_SUITE_END(); + + private: + + TestSupport testSupport; + + public: + + AsyncSenderTest(); + virtual ~AsyncSenderTest(); + + virtual std::string getBrokerURL() const { + return common::IntegrationCommon::defaultURL + "?useAsyncSend=true"; + } + + virtual void test(); + + }; + +}}} + +#endif /*_INTEGRATION_CONNECTOR_STOMP_ASYNCSENDERTEST_H_*/ Added: activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/stomp/DurableTest.cpp URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/stomp/DurableTest.cpp?view=auto&rev=516785 ============================================================================== --- activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/stomp/DurableTest.cpp (added) +++ activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/stomp/DurableTest.cpp Sat Mar 10 13:21:22 2007 @@ -0,0 +1,154 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "DurableTester.h" + +CPPUNIT_TEST_SUITE_REGISTRATION( integration::connector::stomp::DurableTest ); + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +using namespace activemq::connector::stomp; +using namespace activemq::transport; +using namespace activemq::util; +using namespace std; +using namespace cms; +using namespace activemq; +using namespace activemq::core; +using namespace activemq::util; +using namespace activemq::connector; +using namespace activemq::exceptions; +using namespace activemq::network; +using namespace activemq::transport; +using namespace activemq::concurrent; + +using namespace integration; +using namespace integration::connector::stomp; + +DurableTest::DurableTest() +: + testSupport("stomp://localhost:61613") +{ + testSupport.initialize(); +} + +DurableTest::~DurableTest() +{} + +void DurableTest::test() +{ + try + { + if( IntegrationCommon::debug ) { + cout << "Starting activemqcms durable test (sending " + << IntegrationCommon::defaultMsgCount + << " messages per type and sleeping " + << IntegrationCommon::defaultDelay + << " milli-seconds) ...\n" + << endl; + } + + std::string subName = Guid().createGUID(); + + // Create CMS Object for Comms + cms::Session* session = testSupport.getSession(); + cms::Topic* topic = session->createTopic("mytopic"); + cms::MessageConsumer* consumer = + session->createDurableConsumer( topic, subName, "" ); + consumer->setMessageListener( &testSupport ); + cms::MessageProducer* producer = + session->createProducer( topic ); + + unsigned int sent; + + // Send some text messages + sent = testSupport.produceTextMessages( *producer, 3 ); + + // Wait for all messages + testSupport.waitForMessages( sent ); + + unsigned int numReceived = testSupport.getNumReceived(); + + if( IntegrationCommon::debug ) { + printf("received: %d\n", numReceived ); + } + + CPPUNIT_ASSERT( numReceived == sent ); + + // Nuke the consumer + delete consumer; + + // Send some text messages + sent += testSupport.produceTextMessages( *producer, 3 ); + + consumer = session->createDurableConsumer( topic, subName, "" ); + consumer->setMessageListener( &testSupport ); + + // Send some text messages + sent += testSupport.produceTextMessages( *producer, 3 ); + + // Wait for all remaining messages + testSupport.waitForMessages( sent ); + + numReceived = testSupport.getNumReceived(); + if( IntegrationCommon::debug ) { + printf("received: %d\n", numReceived ); + } + CPPUNIT_ASSERT( numReceived == sent ); + + if( IntegrationCommon::debug ) { + printf("Shutting Down\n" ); + } + delete producer; + delete consumer; + delete topic; + } + AMQ_CATCH_RETHROW( ActiveMQException ) + AMQ_CATCHALL_THROW( ActiveMQException ) +} Added: activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/stomp/DurableTest.h URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/stomp/DurableTest.h?view=auto&rev=516785 ============================================================================== --- activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/stomp/DurableTest.h (added) +++ activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/stomp/DurableTest.h Sat Mar 10 13:21:22 2007 @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef _INTEGRATION_CONNECTOR_STOMP_DURABLETESTER_H_ +#define _INTEGRATION_CONNECTOR_STOMP_DURABLETESTER_H_ + +#include + +#include +#include + +namespace integration{ +namespace connector{ +namespace stomp{ + + class DurableTest : public CppUnit::TestFixture + { + CPPUNIT_TEST_SUITE( DurableTest ); + CPPUNIT_TEST( test ); + CPPUNIT_TEST_SUITE_END(); + + private: + + TestSupport testSupport; + + public: + + DurableTest(); + virtual ~DurableTest(); + + virtual void test(); + + }; + +}}} + +#endif /*_INTEGRATION_CONNECTOR_STOMP_DURABLETESTER_H_*/ Added: activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/stomp/ExpirationTest.cpp URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/stomp/ExpirationTest.cpp?view=auto&rev=516785 ============================================================================== --- activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/stomp/ExpirationTest.cpp (added) +++ activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/stomp/ExpirationTest.cpp Sat Mar 10 13:21:22 2007 @@ -0,0 +1,350 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "ExpirationTest.h" + + +CPPUNIT_TEST_SUITE_REGISTRATION( integration::connector::stomp::ExpirationTest ); + +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +using namespace activemq::connector::stomp; +using namespace activemq::transport; +using namespace activemq::util; +using namespace std; +using namespace cms; +using namespace activemq; +using namespace activemq::core; +using namespace activemq::util; +using namespace activemq::connector; +using namespace activemq::exceptions; +using namespace activemq::network; +using namespace activemq::transport; +using namespace activemq::concurrent; + +using namespace std; +using namespace integration; +using namespace integration::connector::stomp; + +std::string messageTag = Guid().createGUID(); + +class Producer : public Runnable { +private: + + ActiveMQConnectionFactory* connectionFactory; + Connection* connection; + Session* session; + Topic* destination; + MessageProducer* producer; + int numMessages; + long long timeToLive; + bool disableTimeStamps; + +public: + + Producer( int numMessages, long long timeToLive ){ + connection = NULL; + session = NULL; + destination = NULL; + producer = NULL; + this->numMessages = numMessages; + this->timeToLive = timeToLive; + this->disableTimeStamps = false; + } + + virtual ~Producer(){ + cleanup(); + } + + virtual bool getDisableTimeStamps() const { + return disableTimeStamps; + } + + virtual void setDisableTimeStamps( bool value ) { + this->disableTimeStamps = value; + } + + virtual void run() { + try { + // Create a ConnectionFactory + connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61613"); + + // Create a Connection + connection = connectionFactory->createConnection(); + connection->start(); + + string sss=connection->getClientId(); + cout << sss << endl; + + session = connection->createSession( Session::AUTO_ACKNOWLEDGE); + destination = session->createTopic( "expirationTopic" ); + + producer = session->createProducer( destination ); + producer->setDeliveryMode( DeliveryMode::PERSISTENT ); + producer->setDisableMessageTimeStamp( disableTimeStamps ); + + //unsigned long ttt=getcurt(); + producer->setTimeToLive( 1); + + // Create the Thread Id String + string threadIdStr = Integer::toString( Thread::getId() ); + + // Create a messages + string text = (string)"Hello world! from thread " + threadIdStr; + + for( int ix=0; ixcreateTextMessage( text ); + message->setStringProperty( "messageTag", messageTag ); + producer->send( message ); + delete message; + } + + }catch ( CMSException& e ) { + e.printStackTrace(); + } + } + +private: + + void cleanup(){ + + // Destroy resources. + try{ + if( destination != NULL ) delete destination; + }catch ( CMSException& e ) {} + destination = NULL; + + try{ + if( producer != NULL ) delete producer; + }catch ( CMSException& e ) {} + producer = NULL; + + // Close open resources. + try{ + if( session != NULL ) session->close(); + if( connection != NULL ) connection->close(); + }catch ( CMSException& e ) {} + + try{ + if( session != NULL ) delete session; + }catch ( CMSException& e ) {} + session = NULL; + + try{ + if( connection != NULL ) delete connection; + }catch ( CMSException& e ) {} + connection = NULL; + + try{ + if( connectionFactory != NULL ) delete connectionFactory; + }catch ( CMSException& e ) {} + connectionFactory = NULL; + } +}; + +class Consumer : public MessageListener, public Runnable { + +private: + + Connection* connection; + Session* session; + Topic* destination; + MessageConsumer* consumer; + long waitMillis; + int numReceived; + +public: + + Consumer( long waitMillis ){ + connection = NULL; + session = NULL; + destination = NULL; + consumer = NULL; + this->waitMillis = waitMillis; + numReceived = 0; + } + + virtual ~Consumer(){ + cleanup(); + } + + virtual int getNumReceived() const{ + return numReceived; + } + + virtual void run() { + + try { + + string user,passwd,sID; + user="default"; + passwd=""; + sID="lsgID"; + + // Create a ConnectionFactory + ActiveMQConnectionFactory* connectionFactory = + new ActiveMQConnectionFactory("tcp://localhost:61613",user,passwd,sID); + + // Create a Connection + connection = connectionFactory->createConnection(); + delete connectionFactory; + connection->start(); + + // Create a Session + session = connection->createSession( Session::AUTO_ACKNOWLEDGE); + + // Create the destination (Topic or Queue) + destination = session->createTopic( "expirationTopic?consumer.retroactive=true"); + + consumer = session->createConsumer( destination ); + + consumer->setMessageListener( this ); + + // Sleep while asynchronous messages come in. + Thread::sleep( waitMillis ); + + } catch (CMSException& e) { + e.printStackTrace(); + } + } + + virtual void onMessage( const Message* message ){ + + try + { + if( !message->propertyExists( "messageTag" ) || + message->getStringProperty("messageTag") != messageTag ){ + return; + } + + const TextMessage* textMessage = + dynamic_cast< const TextMessage* >( message ); + string text = textMessage->getText(); + numReceived++; + } catch (CMSException& e) { + e.printStackTrace(); + } + } + +private: + + void cleanup(){ + + // Destroy resources. + try{ + if( destination != NULL ) delete destination; + }catch (CMSException& e) {} + destination = NULL; + + try{ + if( consumer != NULL ) delete consumer; + }catch (CMSException& e) {} + consumer = NULL; + + // Close open resources. + try{ + if( session != NULL ) session->close(); + if( connection != NULL ) connection->close(); + }catch (CMSException& e) {} + + try{ + if( session != NULL ) delete session; + }catch (CMSException& e) {} + session = NULL; + + try{ + if( connection != NULL ) delete connection; + }catch (CMSException& e) {} + connection = NULL; + } +}; + +void ExpirationTest::testExpired() +{ + Producer producer( 1, 1 ); + Thread producerThread( &producer ); + producerThread.start(); + producerThread.join(); + + Thread::sleep( 100 ); + + Consumer consumer( 2000 ); + Thread consumerThread( &consumer ); + consumerThread.start(); + consumerThread.join(); + + Thread::sleep( 100 ); + + CPPUNIT_ASSERT( consumer.getNumReceived() == 0 ); +} + +void ExpirationTest::testNotExpired() +{ + Producer producer( 2, 2000 ); + producer.setDisableTimeStamps( true ); + Thread producerThread( &producer ); + producerThread.start(); + producerThread.join(); + + Consumer consumer( 3000 ); + Thread consumerThread( &consumer ); + consumerThread.start(); + consumerThread.join(); + + Thread::sleep( 50 ); + + CPPUNIT_ASSERT( consumer.getNumReceived() == 2 ); +} + Added: activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/stomp/ExpirationTest.h URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/stomp/ExpirationTest.h?view=auto&rev=516785 ============================================================================== --- activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/stomp/ExpirationTest.h (added) +++ activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/stomp/ExpirationTest.h Sat Mar 10 13:21:22 2007 @@ -0,0 +1,52 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef _INTEGRATION_CONNECTOR_STOMP_EXPIRATIONTEST_H_ +#define _INTEGRATION_CONNECTOR_STOMP_EXPIRATIONTEST_H_ + +#include +#include + +#include +#include +#include +#include +#include +#include + +namespace integration{ +namespace expiration{ + + class ExpirationTest : public CppUnit::TestFixture + { + CPPUNIT_TEST_SUITE( ExpirationTest ); + CPPUNIT_TEST( testExpired ); + CPPUNIT_TEST( testNotExpired ); + CPPUNIT_TEST_SUITE_END(); + + public: + + ExpirationTest(){} + virtual ~ExpirationTest(){} + + virtual void testExpired(); + virtual void testNotExpired(); + }; + +}} + +#endif /*_INTEGRATION_CONNECTOR_STOMP_EXPIRATIONTEST_H_*/ Added: activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/stomp/SimpleRollbackTest.cpp URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/stomp/SimpleRollbackTest.cpp?view=auto&rev=516785 ============================================================================== --- activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/stomp/SimpleRollbackTest.cpp (added) +++ activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/stomp/SimpleRollbackTest.cpp Sat Mar 10 13:21:22 2007 @@ -0,0 +1,258 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "SimpleRollbackTest.h" + +#include + +CPPUNIT_TEST_SUITE_REGISTRATION( integration::connector::stomp::SimpleRollbackTest ); + +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +using namespace activemq::connector::stomp; +using namespace activemq::transport; +using namespace activemq::util; +using namespace std; +using namespace cms; +using namespace activemq; +using namespace activemq::core; +using namespace activemq::util; +using namespace activemq::connector; +using namespace activemq::exceptions; +using namespace activemq::network; +using namespace activemq::transport; +using namespace activemq::concurrent; + +using namespace std; +using namespace integration; +using namespace integration::connector::stomp; + +SimpleRollbackTest::SimpleRollbackTest() +{ + try + { + string url = "stomp://localhost:61613"; + numReceived = 0; + + // Default amount to send and receive + msgCount = 1; + + // Create a Factory + connectionFactory = new ActiveMQConnectionFactory( url ); + + // Now create the connection + connection = connectionFactory->createConnection( + "", "", Guid().createGUIDString() ); + + // Set ourself as a recipient of Exceptions + connection->setExceptionListener( this ); + connection->start(); + + // Create a Session + session = connection->createSession( + cms::Session::SESSION_TRANSACTED ); + } + AMQ_CATCH_RETHROW( ActiveMQException ) + AMQ_CATCHALL_THROW( ActiveMQException ) +} + +SimpleRollbackTest::~SimpleRollbackTest() +{ + try + { + session->close(); + connection->close(); + + delete session; + delete connection; + delete connectionFactory; + } + AMQ_CATCH_NOTHROW( ActiveMQException ) + AMQ_CATCHALL_NOTHROW( ) +} + +void SimpleRollbackTest::test() +{ + try + { + // Create CMS Object for Comms + cms::Topic* topic = session->createTopic("mytopic"); + cms::MessageConsumer* consumer = + session->createConsumer( topic ); + consumer->setMessageListener( this ); + cms::MessageProducer* producer = + session->createProducer( topic ); + + cms::TextMessage* textMsg = + session->createTextMessage(); + + for( size_t ix = 0; ix < msgCount; ++ix ) + { + ostringstream lcStream; + lcStream << "SimpleTest - Message #" << ix << ends; + textMsg->setText( lcStream.str() ); + producer->send( textMsg ); + } + + delete textMsg; + + Thread::sleep( 100 ); + + session->commit(); + + textMsg = session->createTextMessage(); + + for( size_t ix = 0; ix < msgCount; ++ix ) + { + ostringstream lcStream; + lcStream << "SimpleTest - Message #" << ix << ends; + textMsg->setText( lcStream.str() ); + producer->send( textMsg ); + } + + delete textMsg; + + Thread::sleep( 500 ); + + session->rollback(); + + Thread::sleep( 500 ); + + textMsg = session->createTextMessage(); + textMsg->setText( "SimpleTest - Message after Rollback" ); + producer->send( textMsg ); + delete textMsg; + + Thread::sleep( 15000 ); + + CPPUNIT_ASSERT( true ); + + textMsg = session->createTextMessage(); + textMsg->setText( "SimpleTest - Message after Rollback" ); + producer->send( textMsg ); + delete textMsg; + + if( IntegrationCommon::debug ) { + printf( "Shutting Down\n" ); + } + + delete producer; + delete consumer; + delete topic; + } + AMQ_CATCH_RETHROW( ActiveMQException ) + AMQ_CATCHALL_THROW( ActiveMQException ) +} + +void SimpleRollbackTest::onException( const cms::CMSException& error AMQCPP_UNUSED) +{ + bool AbstractTester = false; + CPPUNIT_ASSERT( AbstractTester ); +} + +void SimpleRollbackTest::onMessage( const cms::Message* message ) +{ + try + { + // Got a text message. + const cms::TextMessage* txtMsg = + dynamic_cast(message); + + if( txtMsg != NULL ) + { + if( IntegrationCommon::debug ) { + printf("received text msg: %s\n", txtMsg->getText().c_str() ); + } + + numReceived++; + + // Signal that we got one + synchronized( &mutex ) + { + mutex.notifyAll(); + } + + return; + } + + // Got a bytes msg. + const cms::BytesMessage* bytesMsg = + dynamic_cast(message); + + if( bytesMsg != NULL ) + { + const unsigned char* bytes = bytesMsg->getBodyBytes(); + + string transcode( (const char*)bytes, bytesMsg->getBodyLength() ); + + if( IntegrationCommon::debug ) { + printf("Received Bytes Message: %s", transcode.c_str() ); + } + + numReceived++; + + // Signal that we got one + synchronized( &mutex ) + { + mutex.notifyAll(); + } + + return; + } + } + AMQ_CATCH_NOTHROW( ActiveMQException ) + AMQ_CATCHALL_NOTHROW( ) +} Added: activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/stomp/SimpleRollbackTest.h URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/stomp/SimpleRollbackTest.h?view=auto&rev=516785 ============================================================================== --- activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/stomp/SimpleRollbackTest.h (added) +++ activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/stomp/SimpleRollbackTest.h Sat Mar 10 13:21:22 2007 @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef _INTEGRATION_CONNECTOR_STOMP_SIMPLEROLLBACKTEST_H_ +#define _INTEGRATION_CONNECTOR_STOMP_SIMPLEROLLBACKTEST_H_ + +#include +#include + +#include + +#include +#include +#include +#include +#include +#include + +namespace integration{ +namespace connector{ +namespace stomp{ + + class SimpleRollbackTest : public CppUnit::TestFixture, + public cms::ExceptionListener, + public cms::MessageListener + { + CPPUNIT_TEST_SUITE( SimpleRollbackTest ); + CPPUNIT_TEST( test ); + CPPUNIT_TEST_SUITE_END(); + + public: + + SimpleRollbackTest(); + virtual ~SimpleRollbackTest(); + + virtual void test(void); + + virtual void onException( const cms::CMSException& error ); + virtual void onMessage( const cms::Message* message ); + + private: + + cms::ConnectionFactory* connectionFactory; + cms::Connection* connection; + cms::Session* session; + + unsigned int numReceived; + unsigned int msgCount; + activemq::concurrent::Mutex mutex; + + }; + +}}} + +#endif /*_INTEGRATION_CONNECTOR_STOMP_SIMPLEROLLBACKTEST_H_*/ Added: activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/stomp/SimpleTest.cpp URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/stomp/SimpleTest.cpp?view=auto&rev=516785 ============================================================================== --- activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/stomp/SimpleTest.cpp (added) +++ activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/stomp/SimpleTest.cpp Sat Mar 10 13:21:22 2007 @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "SimpleTester.h" +#include + +CPPUNIT_TEST_SUITE_REGISTRATION( integration::connector::stomp::SimpleTester ); + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +using namespace activemq::connector::stomp; +using namespace activemq::transport; +using namespace activemq::util; +using namespace std; +using namespace cms; +using namespace activemq; +using namespace activemq::core; +using namespace activemq::util; +using namespace activemq::connector; +using namespace activemq::exceptions; +using namespace activemq::network; +using namespace activemq::transport; +using namespace activemq::concurrent; + +using namespace integration; +using namespace integration::connector::stomp; + +SimpleTester::SimpleTester() +: + testSupport( "stomp://localhost:61613" ) +{ + testSupport.initialize(); +} + +SimpleTester::~SimpleTester() +{ +} + +void SimpleTester::test() +{ + try + { + if( IntegrationCommon::debug ) { + cout << "Starting activemqcms test (sending " + << IntegrationCommon::defaultMsgCount + << " messages per type and sleeping " + << IntegrationCommon::defaultDelay + << " milli-seconds) ...\n" + << endl; + } + + // Create CMS Object for Comms + cms::Session* session = testSupport.getSession(); + cms::Topic* topic = session->createTopic("mytopic"); + cms::MessageConsumer* consumer = + session->createConsumer( topic ); + consumer->setMessageListener( &testSupport ); + cms::MessageProducer* producer = + session->createProducer( topic ); + + // Send some text messages + testSupport.produceTextMessages( + *producer, IntegrationCommon::defaultMsgCount ); + + // Send some bytes messages. + testSupport.produceTextMessages( + *producer, IntegrationCommon::defaultMsgCount ); + + // Wait for the messages to get here + testSupport.waitForMessages( IntegrationCommon::defaultMsgCount * 2 ); + + unsigned int numReceived = testSupport.getNumReceived(); + if( IntegrationCommon::debug ) { + printf("received: %d\n", numReceived ); + } + CPPUNIT_ASSERT( + numReceived == IntegrationCommon::defaultMsgCount * 2 ); + + if( IntegrationCommon::debug ) { + printf("Shutting Down\n" ); + } + delete producer; + delete consumer; + delete topic; + } + AMQ_CATCH_RETHROW( ActiveMQException ) + AMQ_CATCHALL_THROW( ActiveMQException ) +} + Added: activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/stomp/SimpleTest.h URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/stomp/SimpleTest.h?view=auto&rev=516785 ============================================================================== --- activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/stomp/SimpleTest.h (added) +++ activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/stomp/SimpleTest.h Sat Mar 10 13:21:22 2007 @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef _INTEGRATION_CONNECTOR_STOMP_SIMPLETESTER_H_ +#define _INTEGRATION_CONNECTOR_STOMP_SIMPLETESTER_H_ + +#include +#include + +#include + +namespace integration{ +namespace connector{ +namespace stomp{ + + class SimpleTest : public CppUnit::TestFixture + { + CPPUNIT_TEST_SUITE( SimpleTest ); + CPPUNIT_TEST( test ); + CPPUNIT_TEST_SUITE_END(); + + private: + + TestSupport testSupport; + + public: + + SimpleTest(); + virtual ~SimpleTest(); + + virtual void test(void); + + }; + +}}} + +#endif /*_INTEGRATION_CONNECTOR_STOMP_SIMPLETESTER_H_*/ Added: activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/stomp/TransactionTest.cpp URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/stomp/TransactionTest.cpp?view=auto&rev=516785 ============================================================================== --- activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/stomp/TransactionTest.cpp (added) +++ activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/stomp/TransactionTest.cpp Sat Mar 10 13:21:22 2007 @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "TransactionTest.h" +#include + +CPPUNIT_TEST_SUITE_REGISTRATION( integration::connector::stomp::TransactionTest ); + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +using namespace activemq::connector::stomp; +using namespace activemq::transport; +using namespace activemq::util; +using namespace std; +using namespace cms; +using namespace activemq; +using namespace activemq::core; +using namespace activemq::util; +using namespace activemq::connector; +using namespace activemq::exceptions; +using namespace activemq::network; +using namespace activemq::transport; +using namespace activemq::concurrent; + +using namespace integration; +using namespace integration::connector::stomp; + +TransactionTest::TransactionTest() +: + testSupport( "stomp://localhost:61613", cms::Session::SESSION_TRANSACTED ) +{ + testSupport.initialize(); +} + +TransactionTest::~TransactionTest() +{} + +void TransactionTest::test() +{ + try + { + if( IntegrationCommon::debug ) { + cout << "Starting activemqcms transactional test (sending " + << IntegrationCommon::defaultMsgCount + << " messages per type and sleeping " + << IntegrationCommon::defaultDelay + << " milli-seconds) ...\n" + << endl; + } + + // Create CMS Object for Comms + cms::Session* session = testSupport.getSession(); + cms::Topic* topic = session->createTopic("mytopic"); + cms::MessageConsumer* consumer = + session->createConsumer( topic ); + consumer->setMessageListener( &testSupport ); + cms::MessageProducer* producer = + session->createProducer( topic ); + + // Send some text messages + testSupport.produceTextMessages( + *producer, IntegrationCommon::defaultMsgCount ); + + session->commit(); + + // Send some bytes messages. + testSupport.produceTextMessages( + *producer, IntegrationCommon::defaultMsgCount ); + + session->commit(); + + // Wait till we get all the messages + testSupport.waitForMessages( IntegrationCommon::defaultMsgCount * 2 ); + + unsigned int numReceived = testSupport.getNumReceived(); + if( IntegrationCommon::debug ) { + printf("received: %d\n", numReceived ); + } + CPPUNIT_ASSERT( + numReceived == IntegrationCommon::defaultMsgCount * 2 ); + + testSupport.setNumReceived( 0 ); + + // Send some text messages + this->produceTextMessages( + *producer, IntegrationCommon::defaultMsgCount ); + + session->rollback(); + + // Wait till we get all the messages + testSupport.waitForMessages( IntegrationCommon::defaultMsgCount ); + + numReceived = testSupport.getNumReceived(); + if( IntegrationCommon::debug ) { + printf("received: %d\n", numReceived ); + } + CPPUNIT_ASSERT( + numReceived == IntegrationCommon::defaultMsgCount ); + + if( IntegrationCommon::debug ) { + printf("Shutting Down\n" ); + } + delete producer; + delete consumer; + delete topic; + } + AMQ_CATCH_RETHROW( ActiveMQException ) + AMQ_CATCHALL_THROW( ActiveMQException ) +} + Added: activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/stomp/TransactionTest.h URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/stomp/TransactionTest.h?view=auto&rev=516785 ============================================================================== --- activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/stomp/TransactionTest.h (added) +++ activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/integration/connector/stomp/TransactionTest.h Sat Mar 10 13:21:22 2007 @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef _INTEGRATION_CONNECTOR_STOMP_TRANSACTIONTESTER_H_ +#define _INTEGRATION_CONNECTOR_STOMP_TRANSACTIONTESTER_H_ + +#include + +#include +#include + +namespace integration{ +namespace connector{ +namespace stomp{ + + class TransactionTest : public CppUnit::TestFixture + { + CPPUNIT_TEST_SUITE( TransactionTest ); + CPPUNIT_TEST( test ); + CPPUNIT_TEST_SUITE_END(); + + public: + + TransactionTest(); + virtual ~TransactionTest(); + + virtual void test(); + + }; + +}}} + +#endif /*_INTEGRATION_CONNECTOR_STOMP_TRANSACTIONTESTER_H_*/