activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r1335744 - in /activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire: OpenwireTempDestinationTest.cpp OpenwireTempDestinationTest.h
Date Tue, 08 May 2012 20:22:41 GMT
Author: tabish
Date: Tue May  8 20:22:41 2012
New Revision: 1335744

URL: http://svn.apache.org/viewvc?rev=1335744&view=rev
Log:
Adds new tests for temp destination handling now that we listen for advisory messages with
AdvisoryConsumer.

Modified:
    activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireTempDestinationTest.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireTempDestinationTest.h

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireTempDestinationTest.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireTempDestinationTest.cpp?rev=1335744&r1=1335743&r2=1335744&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireTempDestinationTest.cpp
(original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireTempDestinationTest.cpp
Tue May  8 20:22:41 2012
@@ -21,11 +21,14 @@
 #include <decaf/util/concurrent/Mutex.h>
 #include <decaf/util/concurrent/CountDownLatch.h>
 #include <decaf/util/UUID.h>
+#include <decaf/util/ArrayList.h>
 #include <activemq/exceptions/ActiveMQException.h>
+#include <activemq/core/ActiveMQConnectionFactory.h>
 
 using namespace std;
 using namespace cms;
 using namespace activemq;
+using namespace activemq::core;
 using namespace activemq::test;
 using namespace activemq::test::openwire;
 using namespace activemq::util;
@@ -234,3 +237,186 @@ void OpenwireTempDestinationTest::testTw
     // Shutdown the Requester.
     requestorThread.join();
 }
+
+///////////////////////////////////////////////////////////////////////////////
+void OpenwireTempDestinationTest::testTempDestOnlyConsumedByLocalConn() {
+
+    std::auto_ptr<ActiveMQConnectionFactory> factory(
+        new ActiveMQConnectionFactory(cmsProvider->getBrokerURL()));
+    factory->setAlwaysSyncSend(true);
+
+    std::auto_ptr<Connection> tempConnection(factory->createConnection());
+    tempConnection->start();
+    std::auto_ptr<Session> tempSession(tempConnection->createSession());
+    std::auto_ptr<TemporaryQueue> queue(tempSession->createTemporaryQueue());
+    std::auto_ptr<MessageProducer> producer(tempSession->createProducer(queue.get()));
+    producer->setDeliveryMode(DeliveryMode::NON_PERSISTENT);
+    std::auto_ptr<TextMessage> message(tempSession->createTextMessage("First"));
+    producer->send(message.get());
+
+    // temp destination should not be consume when using another connection
+    std::auto_ptr<Connection> otherConnection(factory->createConnection());
+    std::auto_ptr<Session> otherSession(otherConnection->createSession());
+    std::auto_ptr<TemporaryQueue> otherQueue(otherSession->createTemporaryQueue());
+    std::auto_ptr<MessageConsumer> consumer(otherSession->createConsumer(otherQueue.get()));
+    std::auto_ptr<Message> msg(consumer->receive(3000));
+    CPPUNIT_ASSERT(msg.get() == NULL);
+
+    // should throw InvalidDestinationException when consuming a temp
+    // destination from another connection
+    CPPUNIT_ASSERT_THROW_MESSAGE(
+        "Should throw a CMS InvalidDestinationException",
+        otherSession->createConsumer(queue.get()),
+        InvalidDestinationException);
+
+    // should be able to consume temp destination from the same connection
+    consumer.reset(tempSession->createConsumer(queue.get()));
+    msg.reset(consumer->receive(3000));
+    CPPUNIT_ASSERT(msg.get() != NULL);
+}
+
+///////////////////////////////////////////////////////////////////////////////
+void OpenwireTempDestinationTest::testTempQueueHoldsMessagesWithConsumers() {
+
+    std::auto_ptr<TemporaryQueue> queue(cmsProvider->getSession()->createTemporaryQueue());
+    std::auto_ptr<MessageConsumer> consumer(cmsProvider->getSession()->createConsumer(queue.get()));
+    std::auto_ptr<MessageProducer> producer(cmsProvider->getSession()->createProducer(queue.get()));
+    producer->setDeliveryMode(DeliveryMode::NON_PERSISTENT);
+    std::auto_ptr<TextMessage> message(cmsProvider->getSession()->createTextMessage("Hello"));
+    producer->send(message.get());
+
+    std::auto_ptr<Message> message2(consumer->receive(3000));
+    CPPUNIT_ASSERT(message2.get() != NULL);
+    CPPUNIT_ASSERT_MESSAGE("Expected message to be a TextMessage", dynamic_cast<TextMessage*>(message2.get())
!= NULL);
+    CPPUNIT_ASSERT_MESSAGE(std::string("Expected message to be a '") + message->getText()
+ "'",
+        dynamic_cast<TextMessage*>(message2.get())->getText() == message->getText());
+}
+
+///////////////////////////////////////////////////////////////////////////////
+void OpenwireTempDestinationTest::testTempQueueHoldsMessagesWithoutConsumers() {
+
+    std::auto_ptr<TemporaryQueue> queue(cmsProvider->getSession()->createTemporaryQueue());
+    std::auto_ptr<MessageProducer> producer(cmsProvider->getSession()->createProducer(queue.get()));
+    producer->setDeliveryMode(DeliveryMode::NON_PERSISTENT);
+    std::auto_ptr<TextMessage> message(cmsProvider->getSession()->createTextMessage("Hello"));
+    producer->send(message.get());
+
+    std::auto_ptr<MessageConsumer> consumer(cmsProvider->getSession()->createConsumer(queue.get()));
+    std::auto_ptr<Message> message2(consumer->receive(3000));
+    CPPUNIT_ASSERT(message2.get() != NULL);
+    CPPUNIT_ASSERT_MESSAGE("Expected message to be a TextMessage", dynamic_cast<TextMessage*>(message2.get())
!= NULL);
+    CPPUNIT_ASSERT_MESSAGE(std::string("Expected message to be a '") + message->getText()
+ "'",
+        dynamic_cast<TextMessage*>(message2.get())->getText() == message->getText());
+}
+
+///////////////////////////////////////////////////////////////////////////////
+void OpenwireTempDestinationTest::testTmpQueueWorksUnderLoad() {
+
+    int count = 500;
+    int dataSize = 1024;
+
+    ArrayList<Pointer<BytesMessage> > list(count);
+    std::auto_ptr<TemporaryQueue> queue(cmsProvider->getSession()->createTemporaryQueue());
+    std::auto_ptr<MessageProducer> producer(cmsProvider->getSession()->createProducer(queue.get()));
+    producer->setDeliveryMode(DeliveryMode::NON_PERSISTENT);
+
+    unsigned char data[1024];
+    for (int i = 0; i < dataSize; ++i) {
+        data[i] = 255;
+    }
+
+    for (int i = 0; i < count; i++) {
+        Pointer<BytesMessage> message(cmsProvider->getSession()->createBytesMessage());
+        message->writeBytes(data, 0, dataSize);
+        message->setIntProperty("c", i);
+        producer->send(message.get());
+        list.add(message);
+    }
+
+    std::auto_ptr<MessageConsumer> consumer(cmsProvider->getSession()->createConsumer(queue.get()));
+    for (int i = 0; i < count; i++) {
+        Pointer<Message> message2(consumer->receive(2000));
+        CPPUNIT_ASSERT(message2 != NULL);
+        CPPUNIT_ASSERT_EQUAL(i, message2->getIntProperty("c"));
+        CPPUNIT_ASSERT_MESSAGE("Expected message to be a BytesMessage", dynamic_cast<BytesMessage*>(message2.get())
!= NULL);
+    }
+
+    list.clear();
+}
+
+///////////////////////////////////////////////////////////////////////////////
+void OpenwireTempDestinationTest::testPublishFailsForClosedConnection() {
+
+    Pointer<ActiveMQConnectionFactory> factory(
+        new ActiveMQConnectionFactory(cmsProvider->getBrokerURL()));
+    factory->setAlwaysSyncSend(true);
+
+    std::auto_ptr<Connection> tempConnection(factory->createConnection());
+    tempConnection->start();
+
+    std::auto_ptr<Session> tempSession(tempConnection->createSession());
+    std::auto_ptr<TemporaryQueue> queue(tempSession->createTemporaryQueue());
+
+    // This message delivery should work since the temp connection is still open.
+    std::auto_ptr<MessageProducer> producer(cmsProvider->getSession()->createProducer(queue.get()));
+    producer->setDeliveryMode(DeliveryMode::NON_PERSISTENT);
+    std::auto_ptr<TextMessage> message(cmsProvider->getSession()->createTextMessage("First"));
+    producer->send(message.get());
+    Thread::sleep(2000);
+
+    // Closing the connection should destroy the temp queue that was created.
+    tempConnection->close();
+    Thread::sleep(5000);
+
+    message.reset(cmsProvider->getSession()->createTextMessage("Hello"));
+
+    CPPUNIT_ASSERT_THROW_MESSAGE(
+        "Should throw a CMSException since temp destination should not exist anymore.",
+        producer->send(message.get()),
+        CMSException);
+}
+
+///////////////////////////////////////////////////////////////////////////////
+void OpenwireTempDestinationTest::testPublishFailsForDestoryedTempDestination() {
+
+    Pointer<ActiveMQConnectionFactory> factory(
+        new ActiveMQConnectionFactory(cmsProvider->getBrokerURL()));
+    factory->setAlwaysSyncSend(true);
+
+    std::auto_ptr<Connection> tempConnection(factory->createConnection());
+    tempConnection->start();
+
+    std::auto_ptr<Session> tempSession(tempConnection->createSession());
+    std::auto_ptr<TemporaryQueue> queue(tempSession->createTemporaryQueue());
+
+    // This message delivery should work since the temp connection is still open.
+    std::auto_ptr<MessageProducer> producer(cmsProvider->getSession()->createProducer(queue.get()));
+    producer->setDeliveryMode(DeliveryMode::NON_PERSISTENT);
+    std::auto_ptr<TextMessage> message(cmsProvider->getSession()->createTextMessage("First"));
+    producer->send(message.get());
+    Thread::sleep(2000);
+
+    // deleting the Queue will cause sends to fail
+    queue->destroy();
+    Thread::sleep(5000); // Wait a little bit to let the delete take effect.
+
+    message.reset(cmsProvider->getSession()->createTextMessage("Hello"));
+
+    CPPUNIT_ASSERT_THROW_MESSAGE(
+        "Should throw a CMSException since temp destination should not exist anymore.",
+        producer->send(message.get()),
+        CMSException);
+}
+
+///////////////////////////////////////////////////////////////////////////////
+void OpenwireTempDestinationTest::testDeleteDestinationWithSubscribersFails() {
+
+    std::auto_ptr<TemporaryQueue> queue(cmsProvider->getSession()->createTemporaryQueue());
+    std::auto_ptr<MessageConsumer> consumer(cmsProvider->getSession()->createConsumer(queue.get()));
+
+    // This message delivery should NOT work since the temp connection is now closed.
+    CPPUNIT_ASSERT_THROW_MESSAGE(
+        "Should fail with CMSException as Subscribers are active",
+        queue->destroy(),
+        CMSException);
+}

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireTempDestinationTest.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireTempDestinationTest.h?rev=1335744&r1=1335743&r2=1335744&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireTempDestinationTest.h
(original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireTempDestinationTest.h
Tue May  8 20:22:41 2012
@@ -27,8 +27,15 @@ namespace openwire{
     class OpenwireTempDestinationTest : public CMSTestFixture {
 
         CPPUNIT_TEST_SUITE( OpenwireTempDestinationTest );
-        CPPUNIT_TEST( testBasics );
-        CPPUNIT_TEST( testTwoConnections );
+//        CPPUNIT_TEST( testBasics );
+//        CPPUNIT_TEST( testTwoConnections );
+//        CPPUNIT_TEST( testTempDestOnlyConsumedByLocalConn );
+//        CPPUNIT_TEST( testTempQueueHoldsMessagesWithConsumers );
+//        CPPUNIT_TEST( testTempQueueHoldsMessagesWithoutConsumers );
+//        CPPUNIT_TEST( testTmpQueueWorksUnderLoad );
+//        CPPUNIT_TEST( testPublishFailsForClosedConnection );
+//        CPPUNIT_TEST( testPublishFailsForDestoryedTempDestination );
+        CPPUNIT_TEST( testDeleteDestinationWithSubscribersFails );
         CPPUNIT_TEST_SUITE_END();
 
     public:
@@ -36,8 +43,15 @@ namespace openwire{
         OpenwireTempDestinationTest() {}
         virtual ~OpenwireTempDestinationTest() {}
 
-        virtual void testBasics();
-        virtual void testTwoConnections();
+        void testBasics();
+        void testTwoConnections();
+        void testTempDestOnlyConsumedByLocalConn();
+        void testTempQueueHoldsMessagesWithConsumers();
+        void testTempQueueHoldsMessagesWithoutConsumers();
+        void testTmpQueueWorksUnderLoad();
+        void testPublishFailsForClosedConnection();
+        void testPublishFailsForDestoryedTempDestination();
+        void testDeleteDestinationWithSubscribersFails();
 
         virtual std::string getBrokerURL() const {
             return activemq::util::IntegrationCommon::getInstance().getOpenwireURL();



Mime
View raw message