Return-Path: X-Original-To: apmail-activemq-commits-archive@www.apache.org Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id E621391CE for ; Tue, 8 May 2012 20:23:03 +0000 (UTC) Received: (qmail 69440 invoked by uid 500); 8 May 2012 20:23:03 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 69409 invoked by uid 500); 8 May 2012 20:23:03 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 69401 invoked by uid 99); 8 May 2012 20:23:03 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 08 May 2012 20:23:03 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 08 May 2012 20:23:01 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id E265C23888CD for ; Tue, 8 May 2012 20:22:41 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1335744 - in /activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire: OpenwireTempDestinationTest.cpp OpenwireTempDestinationTest.h Date: Tue, 08 May 2012 20:22:41 -0000 To: commits@activemq.apache.org From: tabish@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20120508202241.E265C23888CD@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: tabish Date: Tue May 8 20:22:41 2012 New Revision: 1335744 URL: http://svn.apache.org/viewvc?rev=1335744&view=rev Log: Adds new tests for temp destination handling now that we listen for advisory messages with AdvisoryConsumer. Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireTempDestinationTest.cpp activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireTempDestinationTest.h Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireTempDestinationTest.cpp URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireTempDestinationTest.cpp?rev=1335744&r1=1335743&r2=1335744&view=diff ============================================================================== --- activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireTempDestinationTest.cpp (original) +++ activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireTempDestinationTest.cpp Tue May 8 20:22:41 2012 @@ -21,11 +21,14 @@ #include #include #include +#include #include +#include using namespace std; using namespace cms; using namespace activemq; +using namespace activemq::core; using namespace activemq::test; using namespace activemq::test::openwire; using namespace activemq::util; @@ -234,3 +237,186 @@ void OpenwireTempDestinationTest::testTw // Shutdown the Requester. requestorThread.join(); } + +/////////////////////////////////////////////////////////////////////////////// +void OpenwireTempDestinationTest::testTempDestOnlyConsumedByLocalConn() { + + std::auto_ptr factory( + new ActiveMQConnectionFactory(cmsProvider->getBrokerURL())); + factory->setAlwaysSyncSend(true); + + std::auto_ptr tempConnection(factory->createConnection()); + tempConnection->start(); + std::auto_ptr tempSession(tempConnection->createSession()); + std::auto_ptr queue(tempSession->createTemporaryQueue()); + std::auto_ptr producer(tempSession->createProducer(queue.get())); + producer->setDeliveryMode(DeliveryMode::NON_PERSISTENT); + std::auto_ptr message(tempSession->createTextMessage("First")); + producer->send(message.get()); + + // temp destination should not be consume when using another connection + std::auto_ptr otherConnection(factory->createConnection()); + std::auto_ptr otherSession(otherConnection->createSession()); + std::auto_ptr otherQueue(otherSession->createTemporaryQueue()); + std::auto_ptr consumer(otherSession->createConsumer(otherQueue.get())); + std::auto_ptr msg(consumer->receive(3000)); + CPPUNIT_ASSERT(msg.get() == NULL); + + // should throw InvalidDestinationException when consuming a temp + // destination from another connection + CPPUNIT_ASSERT_THROW_MESSAGE( + "Should throw a CMS InvalidDestinationException", + otherSession->createConsumer(queue.get()), + InvalidDestinationException); + + // should be able to consume temp destination from the same connection + consumer.reset(tempSession->createConsumer(queue.get())); + msg.reset(consumer->receive(3000)); + CPPUNIT_ASSERT(msg.get() != NULL); +} + +/////////////////////////////////////////////////////////////////////////////// +void OpenwireTempDestinationTest::testTempQueueHoldsMessagesWithConsumers() { + + std::auto_ptr queue(cmsProvider->getSession()->createTemporaryQueue()); + std::auto_ptr consumer(cmsProvider->getSession()->createConsumer(queue.get())); + std::auto_ptr producer(cmsProvider->getSession()->createProducer(queue.get())); + producer->setDeliveryMode(DeliveryMode::NON_PERSISTENT); + std::auto_ptr message(cmsProvider->getSession()->createTextMessage("Hello")); + producer->send(message.get()); + + std::auto_ptr message2(consumer->receive(3000)); + CPPUNIT_ASSERT(message2.get() != NULL); + CPPUNIT_ASSERT_MESSAGE("Expected message to be a TextMessage", dynamic_cast(message2.get()) != NULL); + CPPUNIT_ASSERT_MESSAGE(std::string("Expected message to be a '") + message->getText() + "'", + dynamic_cast(message2.get())->getText() == message->getText()); +} + +/////////////////////////////////////////////////////////////////////////////// +void OpenwireTempDestinationTest::testTempQueueHoldsMessagesWithoutConsumers() { + + std::auto_ptr queue(cmsProvider->getSession()->createTemporaryQueue()); + std::auto_ptr producer(cmsProvider->getSession()->createProducer(queue.get())); + producer->setDeliveryMode(DeliveryMode::NON_PERSISTENT); + std::auto_ptr message(cmsProvider->getSession()->createTextMessage("Hello")); + producer->send(message.get()); + + std::auto_ptr consumer(cmsProvider->getSession()->createConsumer(queue.get())); + std::auto_ptr message2(consumer->receive(3000)); + CPPUNIT_ASSERT(message2.get() != NULL); + CPPUNIT_ASSERT_MESSAGE("Expected message to be a TextMessage", dynamic_cast(message2.get()) != NULL); + CPPUNIT_ASSERT_MESSAGE(std::string("Expected message to be a '") + message->getText() + "'", + dynamic_cast(message2.get())->getText() == message->getText()); +} + +/////////////////////////////////////////////////////////////////////////////// +void OpenwireTempDestinationTest::testTmpQueueWorksUnderLoad() { + + int count = 500; + int dataSize = 1024; + + ArrayList > list(count); + std::auto_ptr queue(cmsProvider->getSession()->createTemporaryQueue()); + std::auto_ptr producer(cmsProvider->getSession()->createProducer(queue.get())); + producer->setDeliveryMode(DeliveryMode::NON_PERSISTENT); + + unsigned char data[1024]; + for (int i = 0; i < dataSize; ++i) { + data[i] = 255; + } + + for (int i = 0; i < count; i++) { + Pointer message(cmsProvider->getSession()->createBytesMessage()); + message->writeBytes(data, 0, dataSize); + message->setIntProperty("c", i); + producer->send(message.get()); + list.add(message); + } + + std::auto_ptr consumer(cmsProvider->getSession()->createConsumer(queue.get())); + for (int i = 0; i < count; i++) { + Pointer message2(consumer->receive(2000)); + CPPUNIT_ASSERT(message2 != NULL); + CPPUNIT_ASSERT_EQUAL(i, message2->getIntProperty("c")); + CPPUNIT_ASSERT_MESSAGE("Expected message to be a BytesMessage", dynamic_cast(message2.get()) != NULL); + } + + list.clear(); +} + +/////////////////////////////////////////////////////////////////////////////// +void OpenwireTempDestinationTest::testPublishFailsForClosedConnection() { + + Pointer factory( + new ActiveMQConnectionFactory(cmsProvider->getBrokerURL())); + factory->setAlwaysSyncSend(true); + + std::auto_ptr tempConnection(factory->createConnection()); + tempConnection->start(); + + std::auto_ptr tempSession(tempConnection->createSession()); + std::auto_ptr queue(tempSession->createTemporaryQueue()); + + // This message delivery should work since the temp connection is still open. + std::auto_ptr producer(cmsProvider->getSession()->createProducer(queue.get())); + producer->setDeliveryMode(DeliveryMode::NON_PERSISTENT); + std::auto_ptr message(cmsProvider->getSession()->createTextMessage("First")); + producer->send(message.get()); + Thread::sleep(2000); + + // Closing the connection should destroy the temp queue that was created. + tempConnection->close(); + Thread::sleep(5000); + + message.reset(cmsProvider->getSession()->createTextMessage("Hello")); + + CPPUNIT_ASSERT_THROW_MESSAGE( + "Should throw a CMSException since temp destination should not exist anymore.", + producer->send(message.get()), + CMSException); +} + +/////////////////////////////////////////////////////////////////////////////// +void OpenwireTempDestinationTest::testPublishFailsForDestoryedTempDestination() { + + Pointer factory( + new ActiveMQConnectionFactory(cmsProvider->getBrokerURL())); + factory->setAlwaysSyncSend(true); + + std::auto_ptr tempConnection(factory->createConnection()); + tempConnection->start(); + + std::auto_ptr tempSession(tempConnection->createSession()); + std::auto_ptr queue(tempSession->createTemporaryQueue()); + + // This message delivery should work since the temp connection is still open. + std::auto_ptr producer(cmsProvider->getSession()->createProducer(queue.get())); + producer->setDeliveryMode(DeliveryMode::NON_PERSISTENT); + std::auto_ptr message(cmsProvider->getSession()->createTextMessage("First")); + producer->send(message.get()); + Thread::sleep(2000); + + // deleting the Queue will cause sends to fail + queue->destroy(); + Thread::sleep(5000); // Wait a little bit to let the delete take effect. + + message.reset(cmsProvider->getSession()->createTextMessage("Hello")); + + CPPUNIT_ASSERT_THROW_MESSAGE( + "Should throw a CMSException since temp destination should not exist anymore.", + producer->send(message.get()), + CMSException); +} + +/////////////////////////////////////////////////////////////////////////////// +void OpenwireTempDestinationTest::testDeleteDestinationWithSubscribersFails() { + + std::auto_ptr queue(cmsProvider->getSession()->createTemporaryQueue()); + std::auto_ptr consumer(cmsProvider->getSession()->createConsumer(queue.get())); + + // This message delivery should NOT work since the temp connection is now closed. + CPPUNIT_ASSERT_THROW_MESSAGE( + "Should fail with CMSException as Subscribers are active", + queue->destroy(), + CMSException); +} Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireTempDestinationTest.h URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireTempDestinationTest.h?rev=1335744&r1=1335743&r2=1335744&view=diff ============================================================================== --- activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireTempDestinationTest.h (original) +++ activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireTempDestinationTest.h Tue May 8 20:22:41 2012 @@ -27,8 +27,15 @@ namespace openwire{ class OpenwireTempDestinationTest : public CMSTestFixture { CPPUNIT_TEST_SUITE( OpenwireTempDestinationTest ); - CPPUNIT_TEST( testBasics ); - CPPUNIT_TEST( testTwoConnections ); +// CPPUNIT_TEST( testBasics ); +// CPPUNIT_TEST( testTwoConnections ); +// CPPUNIT_TEST( testTempDestOnlyConsumedByLocalConn ); +// CPPUNIT_TEST( testTempQueueHoldsMessagesWithConsumers ); +// CPPUNIT_TEST( testTempQueueHoldsMessagesWithoutConsumers ); +// CPPUNIT_TEST( testTmpQueueWorksUnderLoad ); +// CPPUNIT_TEST( testPublishFailsForClosedConnection ); +// CPPUNIT_TEST( testPublishFailsForDestoryedTempDestination ); + CPPUNIT_TEST( testDeleteDestinationWithSubscribersFails ); CPPUNIT_TEST_SUITE_END(); public: @@ -36,8 +43,15 @@ namespace openwire{ OpenwireTempDestinationTest() {} virtual ~OpenwireTempDestinationTest() {} - virtual void testBasics(); - virtual void testTwoConnections(); + void testBasics(); + void testTwoConnections(); + void testTempDestOnlyConsumedByLocalConn(); + void testTempQueueHoldsMessagesWithConsumers(); + void testTempQueueHoldsMessagesWithoutConsumers(); + void testTmpQueueWorksUnderLoad(); + void testPublishFailsForClosedConnection(); + void testPublishFailsForDestoryedTempDestination(); + void testDeleteDestinationWithSubscribersFails(); virtual std::string getBrokerURL() const { return activemq::util::IntegrationCommon::getInstance().getOpenwireURL();