Author: tabish Date: Sun Feb 3 10:25:17 2008 New Revision: 618049 URL: http://svn.apache.org/viewvc?rev=618049&view=rev Log: https://issues.apache.org/activemq/browse/AMQCPP-161 Adding calls to pull messages when needed and an integration test to make sure it works. Modified: activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConsumer.cpp activemq/activemq-cpp/trunk/src/test-integration/integration/connector/openwire/OpenwireSimpleTest.cpp activemq/activemq-cpp/trunk/src/test-integration/integration/connector/openwire/OpenwireSimpleTest.h Modified: activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConsumer.cpp URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConsumer.cpp?rev=618049&r1=618048&r2=618049&view=diff ============================================================================== --- activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConsumer.cpp (original) +++ activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConsumer.cpp Sun Feb 3 10:25:17 2008 @@ -217,6 +217,9 @@ "ActiveMQConsumer::receive - This Consumer is closed" ); } + // Send a request for a new message if needed + this->sendPullRequest( 0 ); + // Wait for the next message. ActiveMQMessage* msg = dequeue( -1 ); if( msg == NULL ) { @@ -253,6 +256,9 @@ "ActiveMQConsumer::receive - This Consumer is closed" ); } + // Send a request for a new message if needed + this->sendPullRequest( millisecs ); + // Wait for the next message. ActiveMQMessage* msg = dequeue( millisecs ); if( msg == NULL ) { @@ -288,6 +294,9 @@ __FILE__, __LINE__, "ActiveMQConsumer::receive - This Consumer is closed" ); } + + // Send a request for a new message if needed + this->sendPullRequest( -1 ); // Get the next available message, if there is one. ActiveMQMessage* msg = dequeue( 0 ); Modified: activemq/activemq-cpp/trunk/src/test-integration/integration/connector/openwire/OpenwireSimpleTest.cpp URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/test-integration/integration/connector/openwire/OpenwireSimpleTest.cpp?rev=618049&r1=618048&r2=618049&view=diff ============================================================================== --- activemq/activemq-cpp/trunk/src/test-integration/integration/connector/openwire/OpenwireSimpleTest.cpp (original) +++ activemq/activemq-cpp/trunk/src/test-integration/integration/connector/openwire/OpenwireSimpleTest.cpp Sun Feb 3 10:25:17 2008 @@ -73,18 +73,16 @@ using namespace integration; using namespace integration::connector::openwire; -OpenwireSimpleTest::OpenwireSimpleTest() -{ +OpenwireSimpleTest::OpenwireSimpleTest() { } -OpenwireSimpleTest::~OpenwireSimpleTest() -{ +OpenwireSimpleTest::~OpenwireSimpleTest() { } -void OpenwireSimpleTest::testAutoAck() -{ - try - { +void OpenwireSimpleTest::testAutoAck() { + + try { + TestSupport testSupport(IntegrationCommon::getInstance().getOpenwireURL()); testSupport.initialize(); @@ -487,4 +485,48 @@ e.printStackTrace(); CPPUNIT_ASSERT( false ); } +} + +void OpenwireSimpleTest::testWithZeroConsumerPrefetch() { + + try { + TestSupport testSupport(IntegrationCommon::getInstance().getOpenwireURL(), cms::Session::CLIENT_ACKNOWLEDGE ); + testSupport.initialize(); + + if( IntegrationCommon::debug ) { + cout << "Starting activemqcms test (sending " + << IntegrationCommon::defaultMsgCount + << " messages per type and sleeping " + << IntegrationCommon::defaultDelay + << " milli-seconds) ...\n" + << endl; + } + + // Create CMS Object for Comms + cms::Session* session = testSupport.getSession(); + cms::Queue* queue = session->createQueue( + UUID::randomUUID().toString() + "?consumer.prefetchSize=0" ); + cms::MessageConsumer* consumer = session->createConsumer( queue ); + cms::MessageProducer* producer = session->createProducer( queue ); + + cms::TextMessage* textMsg = session->createTextMessage(); + + // Send some text messages + producer->send( textMsg ); + + delete textMsg; + + cms::Message* message = consumer->receive( 1000 ); + CPPUNIT_ASSERT( message != NULL ); + delete message; + + if( IntegrationCommon::debug ) { + printf("Shutting Down\n" ); + } + + delete producer; + delete consumer; + delete queue; + } + AMQ_CATCH_RETHROW( ActiveMQException ) } Modified: activemq/activemq-cpp/trunk/src/test-integration/integration/connector/openwire/OpenwireSimpleTest.h URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/test-integration/integration/connector/openwire/OpenwireSimpleTest.h?rev=618049&r1=618048&r2=618049&view=diff ============================================================================== --- activemq/activemq-cpp/trunk/src/test-integration/integration/connector/openwire/OpenwireSimpleTest.h (original) +++ activemq/activemq-cpp/trunk/src/test-integration/integration/connector/openwire/OpenwireSimpleTest.h Sun Feb 3 10:25:17 2008 @@ -38,6 +38,7 @@ CPPUNIT_TEST( testMultipleSessions ); CPPUNIT_TEST( testReceiveAlreadyInQueue ); CPPUNIT_TEST( testQuickCreateAndDestroy ); + CPPUNIT_TEST( testWithZeroConsumerPrefetch ); CPPUNIT_TEST_SUITE_END(); public: @@ -53,6 +54,7 @@ virtual void testMultipleSessions(); virtual void testReceiveAlreadyInQueue(); virtual void testQuickCreateAndDestroy(); + virtual void testWithZeroConsumerPrefetch(); };