Return-Path: X-Original-To: apmail-activemq-commits-archive@www.apache.org Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 73D5C75D8 for ; Sat, 5 Nov 2011 21:46:49 +0000 (UTC) Received: (qmail 54927 invoked by uid 500); 5 Nov 2011 21:46:49 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 54872 invoked by uid 500); 5 Nov 2011 21:46:48 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 54865 invoked by uid 99); 5 Nov 2011 21:46:48 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 05 Nov 2011 21:46:48 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 05 Nov 2011 21:46:43 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 1767C2388860 for ; Sat, 5 Nov 2011 21:46:22 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1198085 - in /activemq/activemq-cpp/trunk/activemq-cpp: ./ src/test-integration/ src/test-integration/activemq/test/openwire/ Date: Sat, 05 Nov 2011 21:46:21 -0000 To: commits@activemq.apache.org From: tabish@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20111105214622.1767C2388860@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: tabish Date: Sat Nov 5 21:46:21 2011 New Revision: 1198085 URL: http://svn.apache.org/viewvc?rev=1198085&view=rev Log: tests for: https://issues.apache.org/jira/browse/AMQCPP-383 Added: activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireJmsRecoverTest.cpp (with props) activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireJmsRecoverTest.h (with props) Modified: activemq/activemq-cpp/trunk/activemq-cpp/build.sh activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/Makefile.am activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/TestRegistry.cpp Modified: activemq/activemq-cpp/trunk/activemq-cpp/build.sh URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/build.sh?rev=1198085&r1=1198084&r2=1198085&view=diff ============================================================================== --- activemq/activemq-cpp/trunk/activemq-cpp/build.sh (original) +++ activemq/activemq-cpp/trunk/activemq-cpp/build.sh Sat Nov 5 21:46:21 2011 @@ -1,4 +1,4 @@ -#!/bin/sh -e +#!/bin/bash -e # ------------------------------------------------------------------------ # Licensed to the Apache Software Foundation (ASF) under one or more # contributor license agreements. See the NOTICE file distributed with Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/Makefile.am URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/Makefile.am?rev=1198085&r1=1198084&r2=1198085&view=diff ============================================================================== --- activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/Makefile.am (original) +++ activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/Makefile.am Sat Nov 5 21:46:21 2011 @@ -39,6 +39,7 @@ cc_sources = \ activemq/test/openwire/OpenwireExpirationTest.cpp \ activemq/test/openwire/OpenwireIndividualAckTest.cpp \ activemq/test/openwire/OpenwireJmsMessageGroupsTest.cpp \ + activemq/test/openwire/OpenwireJmsRecoverTest.cpp \ activemq/test/openwire/OpenwireMapMessageTest.cpp \ activemq/test/openwire/OpenwireMessageCompressionTest.cpp \ activemq/test/openwire/OpenwireMessagePriorityTest.cpp \ @@ -90,6 +91,7 @@ h_sources = \ activemq/test/openwire/OpenwireExpirationTest.h \ activemq/test/openwire/OpenwireIndividualAckTest.h \ activemq/test/openwire/OpenwireJmsMessageGroupsTest.h \ + activemq/test/openwire/OpenwireJmsRecoverTest.h \ activemq/test/openwire/OpenwireMapMessageTest.h \ activemq/test/openwire/OpenwireMessageCompressionTest.h \ activemq/test/openwire/OpenwireMessagePriorityTest.h \ Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/TestRegistry.cpp URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/TestRegistry.cpp?rev=1198085&r1=1198084&r2=1198085&view=diff ============================================================================== --- activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/TestRegistry.cpp (original) +++ activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/TestRegistry.cpp Sat Nov 5 21:46:21 2011 @@ -22,6 +22,8 @@ #include "activemq/test/openwire/OpenwireDurableTest.h" #include "activemq/test/openwire/OpenwireExpirationTest.h" #include "activemq/test/openwire/OpenwireIndividualAckTest.h" +#include "activemq/test/openwire/OpenwireJmsMessageGroupsTest.h" +#include "activemq/test/openwire/OpenwireJmsRecoverTest.h" #include "activemq/test/openwire/OpenwireMessageCompressionTest.h" #include "activemq/test/openwire/OpenwireMessagePriorityTest.h" #include "activemq/test/openwire/OpenwireMapMessageTest.h" @@ -31,7 +33,6 @@ #include "activemq/test/openwire/OpenwireTransactionTest.h" #include "activemq/test/openwire/OpenwireTempDestinationTest.h" #include "activemq/test/openwire/OpenwireSlowListenerTest.h" -#include "activemq/test/openwire/OpenwireJmsMessageGroupsTest.h" #include "activemq/test/openwire/OpenwireVirtualTopicTest.h" #include "activemq/test/openwire/OpenwireXATransactionsTest.h" @@ -53,6 +54,8 @@ CPPUNIT_TEST_SUITE_REGISTRATION( activem CPPUNIT_TEST_SUITE_REGISTRATION( activemq::test::openwire::OpenwireDurableTest ); CPPUNIT_TEST_SUITE_REGISTRATION( activemq::test::openwire::OpenwireExpirationTest ); CPPUNIT_TEST_SUITE_REGISTRATION( activemq::test::openwire::OpenwireIndividualAckTest ); +CPPUNIT_TEST_SUITE_REGISTRATION( activemq::test::openwire::OpenwireJmsMessageGroupsTest ); +CPPUNIT_TEST_SUITE_REGISTRATION( activemq::test::openwire::OpenwireJmsRecoverTest ); CPPUNIT_TEST_SUITE_REGISTRATION( activemq::test::openwire::OpenwireMessageCompressionTest ); CPPUNIT_TEST_SUITE_REGISTRATION( activemq::test::openwire::OpenwireMessagePriorityTest ); CPPUNIT_TEST_SUITE_REGISTRATION( activemq::test::openwire::OpenwireMapMessageTest ); @@ -62,7 +65,6 @@ CPPUNIT_TEST_SUITE_REGISTRATION( activem CPPUNIT_TEST_SUITE_REGISTRATION( activemq::test::openwire::OpenwireTransactionTest ); CPPUNIT_TEST_SUITE_REGISTRATION( activemq::test::openwire::OpenwireSlowListenerTest ); CPPUNIT_TEST_SUITE_REGISTRATION( activemq::test::openwire::OpenwireTempDestinationTest ); -CPPUNIT_TEST_SUITE_REGISTRATION( activemq::test::openwire::OpenwireJmsMessageGroupsTest ); CPPUNIT_TEST_SUITE_REGISTRATION( activemq::test::openwire::OpenwireVirtualTopicTest ); CPPUNIT_TEST_SUITE_REGISTRATION( activemq::test::openwire::OpenwireXATransactionsTest ); Added: activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireJmsRecoverTest.cpp URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireJmsRecoverTest.cpp?rev=1198085&view=auto ============================================================================== --- activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireJmsRecoverTest.cpp (added) +++ activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireJmsRecoverTest.cpp Sat Nov 5 21:46:21 2011 @@ -0,0 +1,304 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "OpenwireJmsRecoverTest.h" + +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +using namespace cms; +using namespace std; +using namespace decaf; +using namespace decaf::lang; +using namespace decaf::util; +using namespace decaf::util::concurrent; +using namespace activemq; +using namespace activemq::core; +using namespace activemq::commands; +using namespace activemq::exceptions; +using namespace activemq::test; +using namespace activemq::test::openwire; + +//////////////////////////////////////////////////////////////////////////////// +OpenwireJmsRecoverTest::OpenwireJmsRecoverTest() { +} + +//////////////////////////////////////////////////////////////////////////////// +OpenwireJmsRecoverTest::~OpenwireJmsRecoverTest() { +} + +//////////////////////////////////////////////////////////////////////////////// +void OpenwireJmsRecoverTest::setUp() { + + factory = ConnectionFactory::createCMSConnectionFactory(getBrokerURL()); + CPPUNIT_ASSERT(factory != NULL); + connection = factory->createConnection(); +} + +//////////////////////////////////////////////////////////////////////////////// +void OpenwireJmsRecoverTest::tearDown() { + delete factory; + delete connection; + delete destination; +} + +//////////////////////////////////////////////////////////////////////////////// +void OpenwireJmsRecoverTest::testQueueSynchRecover() { + destination = new ActiveMQQueue(string("Queue-") + Long::toString(System::currentTimeMillis())); + doTestSynchRecover(); +} + +//////////////////////////////////////////////////////////////////////////////// +void OpenwireJmsRecoverTest::testQueueAsynchRecover() { + destination = new ActiveMQQueue(string("Queue-") + Long::toString(System::currentTimeMillis())); + doTestAsynchRecover(); +} + +//////////////////////////////////////////////////////////////////////////////// +void OpenwireJmsRecoverTest::testTopicSynchRecover() { + destination = new ActiveMQTopic(string("Topic-") + Long::toString(System::currentTimeMillis())); + doTestSynchRecover(); +} + +//////////////////////////////////////////////////////////////////////////////// +void OpenwireJmsRecoverTest::testTopicAsynchRecover() { + destination = new ActiveMQTopic(string("Topic-") + Long::toString(System::currentTimeMillis())); + doTestAsynchRecover(); +} + +//////////////////////////////////////////////////////////////////////////////// +void OpenwireJmsRecoverTest::testQueueAsynchRecoverWithAutoAck() { + destination = new ActiveMQQueue(string("Queue-") + Long::toString(System::currentTimeMillis())); + doTestAsynchRecoverWithAutoAck(); +} + +//////////////////////////////////////////////////////////////////////////////// +void OpenwireJmsRecoverTest::testTopicAsynchRecoverWithAutoAck() { + destination = new ActiveMQTopic(string("Topic-") + Long::toString(System::currentTimeMillis())); + doTestAsynchRecoverWithAutoAck(); +} + +//////////////////////////////////////////////////////////////////////////////// +void OpenwireJmsRecoverTest::doTestSynchRecover() { + + std::auto_ptr session(connection->createSession(cms::Session::CLIENT_ACKNOWLEDGE)); + std::auto_ptr consumer(session->createConsumer(destination)); + connection->start(); + + std::auto_ptr producer(session->createProducer(destination)); + producer->setDeliveryMode(DeliveryMode::NON_PERSISTENT); + producer->send(session->createTextMessage("First")); + producer->send(session->createTextMessage("Second")); + + std::auto_ptr message(dynamic_cast(consumer->receive(2000))); + CPPUNIT_ASSERT_EQUAL(string("First"), message->getText()); + CPPUNIT_ASSERT(!message->getCMSRedelivered()); + message->acknowledge(); + + message.reset(dynamic_cast(consumer->receive(2000))); + CPPUNIT_ASSERT_EQUAL(string("Second"), message->getText()); + CPPUNIT_ASSERT(!message->getCMSRedelivered()); + + session->recover(); + + message.reset(dynamic_cast(consumer->receive(2000))); + CPPUNIT_ASSERT_EQUAL(string("Second"), message->getText()); + CPPUNIT_ASSERT(message->getCMSRedelivered()); + + message->acknowledge(); +} + +//////////////////////////////////////////////////////////////////////////////// +namespace { + + class ClientAckMessageListener : public cms::MessageListener { + private: + + cms::Session* session; + std::vector* errorMessages; + CountDownLatch* doneCountDownLatch; + int counter; + + public: + + ClientAckMessageListener(cms::Session* session, std::vector* errorMessages, CountDownLatch* doneCountDownLatch) + : session(session), errorMessages(errorMessages), doneCountDownLatch(doneCountDownLatch), counter(0) { + } + + virtual ~ClientAckMessageListener() throw() { + } + + virtual void onMessage(const cms::Message* msg) throw() { + counter++; + try { + const TextMessage* message = dynamic_cast(msg); + switch (counter) { + case 1: + CPPUNIT_ASSERT_EQUAL(string("First"), message->getText()); + CPPUNIT_ASSERT(!message->getCMSRedelivered()); + message->acknowledge(); + break; + case 2: + CPPUNIT_ASSERT_EQUAL(string("Second"), message->getText()); + CPPUNIT_ASSERT(!message->getCMSRedelivered()); + session->recover(); + break; + case 3: + CPPUNIT_ASSERT_EQUAL(string("Second"), message->getText()); + CPPUNIT_ASSERT(message->getCMSRedelivered()); + message->acknowledge(); + doneCountDownLatch->countDown(); + break; + default: + errorMessages->push_back(string("Got too many messages: ") + Long::toString(counter)); + doneCountDownLatch->countDown(); + } + } catch (Exception& e) { + errorMessages->push_back(string("Got exception: ") + e.getMessage()); + doneCountDownLatch->countDown(); + } + } + }; +} + +//////////////////////////////////////////////////////////////////////////////// +void OpenwireJmsRecoverTest::doTestAsynchRecover() { + + std::auto_ptr session(connection->createSession(cms::Session::CLIENT_ACKNOWLEDGE)); + std::vector errorMessages; + CountDownLatch doneCountDownLatch(1); + + std::auto_ptr consumer(session->createConsumer(destination)); + + std::auto_ptr producer(session->createProducer(destination)); + producer->setDeliveryMode(DeliveryMode::NON_PERSISTENT); + producer->send(session->createTextMessage("First")); + producer->send(session->createTextMessage("Second")); + + ClientAckMessageListener listener(session.get(), &errorMessages, &doneCountDownLatch); + consumer->setMessageListener(&listener); + + connection->start(); + + if (doneCountDownLatch.await(5, TimeUnit::SECONDS)) { + if (!errorMessages.empty()) { + CPPUNIT_FAIL(errorMessages.front()); + } + } else { + CPPUNIT_FAIL("Timeout waiting for async message delivery to complete."); + } +} + +//////////////////////////////////////////////////////////////////////////////// +namespace { + + class AutoAckMessageListener : public cms::MessageListener { + private: + + cms::Session* session; + std::vector* errorMessages; + CountDownLatch* doneCountDownLatch; + int counter; + + public: + + AutoAckMessageListener(cms::Session* session, std::vector* errorMessages, CountDownLatch* doneCountDownLatch) + : session(session), errorMessages(errorMessages), doneCountDownLatch(doneCountDownLatch), counter(0) { + } + + virtual ~AutoAckMessageListener() throw() { + } + + virtual void onMessage(const cms::Message* msg) throw() { + counter++; + try { + const TextMessage* message = dynamic_cast(msg); + switch (counter) { + case 1: + CPPUNIT_ASSERT_EQUAL(string("First"), message->getText()); + CPPUNIT_ASSERT(!message->getCMSRedelivered()); + break; + case 2: + CPPUNIT_ASSERT_EQUAL(string("Second"), message->getText()); + CPPUNIT_ASSERT(!message->getCMSRedelivered()); + session->recover(); + break; + case 3: + CPPUNIT_ASSERT_EQUAL(string("Second"), message->getText()); + CPPUNIT_ASSERT(message->getCMSRedelivered()); + doneCountDownLatch->countDown(); + break; + default: + errorMessages->push_back(string("Got too many messages: ") + Long::toString(counter)); + doneCountDownLatch->countDown(); + } + } catch (Exception& e) { + errorMessages->push_back(string("Got exception: ") + e.getMessage()); + doneCountDownLatch->countDown(); + } + } + }; +} + +//////////////////////////////////////////////////////////////////////////////// +void OpenwireJmsRecoverTest::doTestAsynchRecoverWithAutoAck() { + + std::auto_ptr session(connection->createSession(cms::Session::AUTO_ACKNOWLEDGE)); + std::vector errorMessages; + CountDownLatch doneCountDownLatch(1); + + std::auto_ptr consumer(session->createConsumer(destination)); + + std::auto_ptr producer(session->createProducer(destination)); + producer->setDeliveryMode(DeliveryMode::NON_PERSISTENT); + producer->send(session->createTextMessage("First")); + producer->send(session->createTextMessage("Second")); + + AutoAckMessageListener listener(session.get(), &errorMessages, &doneCountDownLatch); + consumer->setMessageListener(&listener); + + connection->start(); + + if (doneCountDownLatch.await(5, TimeUnit::SECONDS)) { + if (!errorMessages.empty()) { + CPPUNIT_FAIL(errorMessages.front()); + } + } else { + CPPUNIT_FAIL("Timeout waiting for async message delivery to complete."); + } +} Propchange: activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireJmsRecoverTest.cpp ------------------------------------------------------------------------------ svn:eol-style = native Added: activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireJmsRecoverTest.h URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireJmsRecoverTest.h?rev=1198085&view=auto ============================================================================== --- activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireJmsRecoverTest.h (added) +++ activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireJmsRecoverTest.h Sat Nov 5 21:46:21 2011 @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef _ACTIVEMQ_TEST_OPENWIRE_OPENWIREJMSRECOVERTEST_H_ +#define _ACTIVEMQ_TEST_OPENWIRE_OPENWIREJMSRECOVERTEST_H_ + +#include +#include + +#include +#include +#include + +#include + +namespace activemq { +namespace test { +namespace openwire { + + class OpenwireJmsRecoverTest : public CppUnit::TestFixture { + private: + + CPPUNIT_TEST_SUITE( OpenwireJmsRecoverTest ); + CPPUNIT_TEST( testQueueSynchRecover ); + CPPUNIT_TEST( testQueueAsynchRecover ); + CPPUNIT_TEST( testTopicSynchRecover ); + CPPUNIT_TEST( testTopicAsynchRecover ); + CPPUNIT_TEST( testQueueAsynchRecoverWithAutoAck ); + CPPUNIT_TEST( testTopicAsynchRecoverWithAutoAck ); + CPPUNIT_TEST_SUITE_END(); + + cms::ConnectionFactory* factory; + cms::Connection* connection; + cms::Destination* destination; + + public: + + OpenwireJmsRecoverTest(); + virtual ~OpenwireJmsRecoverTest(); + + virtual std::string getBrokerURL() const { + return activemq::util::IntegrationCommon::getInstance().getOpenwireURL(); + } + + virtual void setUp(); + virtual void tearDown(); + + void testQueueSynchRecover(); + void testQueueAsynchRecover(); + void testTopicSynchRecover(); + void testTopicAsynchRecover(); + void testQueueAsynchRecoverWithAutoAck(); + void testTopicAsynchRecoverWithAutoAck(); + + private: + + void doTestSynchRecover(); + void doTestAsynchRecover(); + void doTestAsynchRecoverWithAutoAck(); + + }; + +}}} + +#endif /* _ACTIVEMQ_TEST_OPENWIRE_OPENWIREJMSRECOVERTEST_H_ */ Propchange: activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireJmsRecoverTest.h ------------------------------------------------------------------------------ svn:eol-style = native