Author: tabish
Date: Tue May 8 20:22:41 2012
New Revision: 1335744
URL: http://svn.apache.org/viewvc?rev=1335744&view=rev
Log:
Adds new tests for temp destination handling now that we listen for advisory messages with
AdvisoryConsumer.
Modified:
activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireTempDestinationTest.cpp
activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireTempDestinationTest.h
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireTempDestinationTest.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireTempDestinationTest.cpp?rev=1335744&r1=1335743&r2=1335744&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireTempDestinationTest.cpp
(original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireTempDestinationTest.cpp
Tue May 8 20:22:41 2012
@@ -21,11 +21,14 @@
#include <decaf/util/concurrent/Mutex.h>
#include <decaf/util/concurrent/CountDownLatch.h>
#include <decaf/util/UUID.h>
+#include <decaf/util/ArrayList.h>
#include <activemq/exceptions/ActiveMQException.h>
+#include <activemq/core/ActiveMQConnectionFactory.h>
using namespace std;
using namespace cms;
using namespace activemq;
+using namespace activemq::core;
using namespace activemq::test;
using namespace activemq::test::openwire;
using namespace activemq::util;
@@ -234,3 +237,186 @@ void OpenwireTempDestinationTest::testTw
// Shutdown the Requester.
requestorThread.join();
}
+
+///////////////////////////////////////////////////////////////////////////////
+void OpenwireTempDestinationTest::testTempDestOnlyConsumedByLocalConn() {
+
+ std::auto_ptr<ActiveMQConnectionFactory> factory(
+ new ActiveMQConnectionFactory(cmsProvider->getBrokerURL()));
+ factory->setAlwaysSyncSend(true);
+
+ std::auto_ptr<Connection> tempConnection(factory->createConnection());
+ tempConnection->start();
+ std::auto_ptr<Session> tempSession(tempConnection->createSession());
+ std::auto_ptr<TemporaryQueue> queue(tempSession->createTemporaryQueue());
+ std::auto_ptr<MessageProducer> producer(tempSession->createProducer(queue.get()));
+ producer->setDeliveryMode(DeliveryMode::NON_PERSISTENT);
+ std::auto_ptr<TextMessage> message(tempSession->createTextMessage("First"));
+ producer->send(message.get());
+
+ // temp destination should not be consume when using another connection
+ std::auto_ptr<Connection> otherConnection(factory->createConnection());
+ std::auto_ptr<Session> otherSession(otherConnection->createSession());
+ std::auto_ptr<TemporaryQueue> otherQueue(otherSession->createTemporaryQueue());
+ std::auto_ptr<MessageConsumer> consumer(otherSession->createConsumer(otherQueue.get()));
+ std::auto_ptr<Message> msg(consumer->receive(3000));
+ CPPUNIT_ASSERT(msg.get() == NULL);
+
+ // should throw InvalidDestinationException when consuming a temp
+ // destination from another connection
+ CPPUNIT_ASSERT_THROW_MESSAGE(
+ "Should throw a CMS InvalidDestinationException",
+ otherSession->createConsumer(queue.get()),
+ InvalidDestinationException);
+
+ // should be able to consume temp destination from the same connection
+ consumer.reset(tempSession->createConsumer(queue.get()));
+ msg.reset(consumer->receive(3000));
+ CPPUNIT_ASSERT(msg.get() != NULL);
+}
+
+///////////////////////////////////////////////////////////////////////////////
+void OpenwireTempDestinationTest::testTempQueueHoldsMessagesWithConsumers() {
+
+ std::auto_ptr<TemporaryQueue> queue(cmsProvider->getSession()->createTemporaryQueue());
+ std::auto_ptr<MessageConsumer> consumer(cmsProvider->getSession()->createConsumer(queue.get()));
+ std::auto_ptr<MessageProducer> producer(cmsProvider->getSession()->createProducer(queue.get()));
+ producer->setDeliveryMode(DeliveryMode::NON_PERSISTENT);
+ std::auto_ptr<TextMessage> message(cmsProvider->getSession()->createTextMessage("Hello"));
+ producer->send(message.get());
+
+ std::auto_ptr<Message> message2(consumer->receive(3000));
+ CPPUNIT_ASSERT(message2.get() != NULL);
+ CPPUNIT_ASSERT_MESSAGE("Expected message to be a TextMessage", dynamic_cast<TextMessage*>(message2.get())
!= NULL);
+ CPPUNIT_ASSERT_MESSAGE(std::string("Expected message to be a '") + message->getText()
+ "'",
+ dynamic_cast<TextMessage*>(message2.get())->getText() == message->getText());
+}
+
+///////////////////////////////////////////////////////////////////////////////
+void OpenwireTempDestinationTest::testTempQueueHoldsMessagesWithoutConsumers() {
+
+ std::auto_ptr<TemporaryQueue> queue(cmsProvider->getSession()->createTemporaryQueue());
+ std::auto_ptr<MessageProducer> producer(cmsProvider->getSession()->createProducer(queue.get()));
+ producer->setDeliveryMode(DeliveryMode::NON_PERSISTENT);
+ std::auto_ptr<TextMessage> message(cmsProvider->getSession()->createTextMessage("Hello"));
+ producer->send(message.get());
+
+ std::auto_ptr<MessageConsumer> consumer(cmsProvider->getSession()->createConsumer(queue.get()));
+ std::auto_ptr<Message> message2(consumer->receive(3000));
+ CPPUNIT_ASSERT(message2.get() != NULL);
+ CPPUNIT_ASSERT_MESSAGE("Expected message to be a TextMessage", dynamic_cast<TextMessage*>(message2.get())
!= NULL);
+ CPPUNIT_ASSERT_MESSAGE(std::string("Expected message to be a '") + message->getText()
+ "'",
+ dynamic_cast<TextMessage*>(message2.get())->getText() == message->getText());
+}
+
+///////////////////////////////////////////////////////////////////////////////
+void OpenwireTempDestinationTest::testTmpQueueWorksUnderLoad() {
+
+ int count = 500;
+ int dataSize = 1024;
+
+ ArrayList<Pointer<BytesMessage> > list(count);
+ std::auto_ptr<TemporaryQueue> queue(cmsProvider->getSession()->createTemporaryQueue());
+ std::auto_ptr<MessageProducer> producer(cmsProvider->getSession()->createProducer(queue.get()));
+ producer->setDeliveryMode(DeliveryMode::NON_PERSISTENT);
+
+ unsigned char data[1024];
+ for (int i = 0; i < dataSize; ++i) {
+ data[i] = 255;
+ }
+
+ for (int i = 0; i < count; i++) {
+ Pointer<BytesMessage> message(cmsProvider->getSession()->createBytesMessage());
+ message->writeBytes(data, 0, dataSize);
+ message->setIntProperty("c", i);
+ producer->send(message.get());
+ list.add(message);
+ }
+
+ std::auto_ptr<MessageConsumer> consumer(cmsProvider->getSession()->createConsumer(queue.get()));
+ for (int i = 0; i < count; i++) {
+ Pointer<Message> message2(consumer->receive(2000));
+ CPPUNIT_ASSERT(message2 != NULL);
+ CPPUNIT_ASSERT_EQUAL(i, message2->getIntProperty("c"));
+ CPPUNIT_ASSERT_MESSAGE("Expected message to be a BytesMessage", dynamic_cast<BytesMessage*>(message2.get())
!= NULL);
+ }
+
+ list.clear();
+}
+
+///////////////////////////////////////////////////////////////////////////////
+void OpenwireTempDestinationTest::testPublishFailsForClosedConnection() {
+
+ Pointer<ActiveMQConnectionFactory> factory(
+ new ActiveMQConnectionFactory(cmsProvider->getBrokerURL()));
+ factory->setAlwaysSyncSend(true);
+
+ std::auto_ptr<Connection> tempConnection(factory->createConnection());
+ tempConnection->start();
+
+ std::auto_ptr<Session> tempSession(tempConnection->createSession());
+ std::auto_ptr<TemporaryQueue> queue(tempSession->createTemporaryQueue());
+
+ // This message delivery should work since the temp connection is still open.
+ std::auto_ptr<MessageProducer> producer(cmsProvider->getSession()->createProducer(queue.get()));
+ producer->setDeliveryMode(DeliveryMode::NON_PERSISTENT);
+ std::auto_ptr<TextMessage> message(cmsProvider->getSession()->createTextMessage("First"));
+ producer->send(message.get());
+ Thread::sleep(2000);
+
+ // Closing the connection should destroy the temp queue that was created.
+ tempConnection->close();
+ Thread::sleep(5000);
+
+ message.reset(cmsProvider->getSession()->createTextMessage("Hello"));
+
+ CPPUNIT_ASSERT_THROW_MESSAGE(
+ "Should throw a CMSException since temp destination should not exist anymore.",
+ producer->send(message.get()),
+ CMSException);
+}
+
+///////////////////////////////////////////////////////////////////////////////
+void OpenwireTempDestinationTest::testPublishFailsForDestoryedTempDestination() {
+
+ Pointer<ActiveMQConnectionFactory> factory(
+ new ActiveMQConnectionFactory(cmsProvider->getBrokerURL()));
+ factory->setAlwaysSyncSend(true);
+
+ std::auto_ptr<Connection> tempConnection(factory->createConnection());
+ tempConnection->start();
+
+ std::auto_ptr<Session> tempSession(tempConnection->createSession());
+ std::auto_ptr<TemporaryQueue> queue(tempSession->createTemporaryQueue());
+
+ // This message delivery should work since the temp connection is still open.
+ std::auto_ptr<MessageProducer> producer(cmsProvider->getSession()->createProducer(queue.get()));
+ producer->setDeliveryMode(DeliveryMode::NON_PERSISTENT);
+ std::auto_ptr<TextMessage> message(cmsProvider->getSession()->createTextMessage("First"));
+ producer->send(message.get());
+ Thread::sleep(2000);
+
+ // deleting the Queue will cause sends to fail
+ queue->destroy();
+ Thread::sleep(5000); // Wait a little bit to let the delete take effect.
+
+ message.reset(cmsProvider->getSession()->createTextMessage("Hello"));
+
+ CPPUNIT_ASSERT_THROW_MESSAGE(
+ "Should throw a CMSException since temp destination should not exist anymore.",
+ producer->send(message.get()),
+ CMSException);
+}
+
+///////////////////////////////////////////////////////////////////////////////
+void OpenwireTempDestinationTest::testDeleteDestinationWithSubscribersFails() {
+
+ std::auto_ptr<TemporaryQueue> queue(cmsProvider->getSession()->createTemporaryQueue());
+ std::auto_ptr<MessageConsumer> consumer(cmsProvider->getSession()->createConsumer(queue.get()));
+
+ // This message delivery should NOT work since the temp connection is now closed.
+ CPPUNIT_ASSERT_THROW_MESSAGE(
+ "Should fail with CMSException as Subscribers are active",
+ queue->destroy(),
+ CMSException);
+}
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireTempDestinationTest.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireTempDestinationTest.h?rev=1335744&r1=1335743&r2=1335744&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireTempDestinationTest.h
(original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireTempDestinationTest.h
Tue May 8 20:22:41 2012
@@ -27,8 +27,15 @@ namespace openwire{
class OpenwireTempDestinationTest : public CMSTestFixture {
CPPUNIT_TEST_SUITE( OpenwireTempDestinationTest );
- CPPUNIT_TEST( testBasics );
- CPPUNIT_TEST( testTwoConnections );
+// CPPUNIT_TEST( testBasics );
+// CPPUNIT_TEST( testTwoConnections );
+// CPPUNIT_TEST( testTempDestOnlyConsumedByLocalConn );
+// CPPUNIT_TEST( testTempQueueHoldsMessagesWithConsumers );
+// CPPUNIT_TEST( testTempQueueHoldsMessagesWithoutConsumers );
+// CPPUNIT_TEST( testTmpQueueWorksUnderLoad );
+// CPPUNIT_TEST( testPublishFailsForClosedConnection );
+// CPPUNIT_TEST( testPublishFailsForDestoryedTempDestination );
+ CPPUNIT_TEST( testDeleteDestinationWithSubscribersFails );
CPPUNIT_TEST_SUITE_END();
public:
@@ -36,8 +43,15 @@ namespace openwire{
OpenwireTempDestinationTest() {}
virtual ~OpenwireTempDestinationTest() {}
- virtual void testBasics();
- virtual void testTwoConnections();
+ void testBasics();
+ void testTwoConnections();
+ void testTempDestOnlyConsumedByLocalConn();
+ void testTempQueueHoldsMessagesWithConsumers();
+ void testTempQueueHoldsMessagesWithoutConsumers();
+ void testTmpQueueWorksUnderLoad();
+ void testPublishFailsForClosedConnection();
+ void testPublishFailsForDestoryedTempDestination();
+ void testDeleteDestinationWithSubscribersFails();
virtual std::string getBrokerURL() const {
return activemq::util::IntegrationCommon::getInstance().getOpenwireURL();
|