activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r923787 [1/2] - in /activemq/activemq-cpp/trunk/activemq-cpp/src: main/ main/activemq/core/ main/cms/ test-integration/ test-integration/activemq/test/ test-integration/activemq/test/openwire/ test/activemq/core/
Date Tue, 16 Mar 2010 14:56:20 GMT
Author: tabish
Date: Tue Mar 16 14:56:19 2010
New Revision: 923787

URL: http://svn.apache.org/viewvc?rev=923787&view=rev
Log:
https://issues.apache.org/activemq/browse/AMQCPP-129

Implement the QueueBrowser for CMS.  Test cases added

Added:
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQQueueBrowser.cpp   (with props)
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQQueueBrowser.h   (with props)
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/cms/MessageEnumeration.h   (with props)
    activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/QueueBrowserTest.cpp   (with props)
    activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/QueueBrowserTest.h   (with props)
    activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireQueueBrowserTest.cpp   (with props)
    activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireQueueBrowserTest.h   (with props)
Modified:
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/Makefile.am
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQProducer.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQProducer.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/cms/QueueBrowser.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/Makefile.am
    activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/TestRegistry.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ActiveMQConnectionFactoryTest.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ActiveMQSessionTest.cpp

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/Makefile.am
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/Makefile.am?rev=923787&r1=923786&r2=923787&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/Makefile.am (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/Makefile.am Tue Mar 16 14:56:19 2010
@@ -91,6 +91,7 @@ cc_sources = \
     activemq/core/ActiveMQConstants.cpp \
     activemq/core/ActiveMQConsumer.cpp \
     activemq/core/ActiveMQProducer.cpp \
+    activemq/core/ActiveMQQueueBrowser.cpp \
     activemq/core/ActiveMQSession.cpp \
     activemq/core/ActiveMQSessionExecutor.cpp \
     activemq/core/ActiveMQTransactionContext.cpp \
@@ -690,6 +691,7 @@ h_sources = \
     activemq/core/ActiveMQConstants.h \
     activemq/core/ActiveMQConsumer.h \
     activemq/core/ActiveMQProducer.h \
+    activemq/core/ActiveMQQueueBrowser.h \
     activemq/core/ActiveMQSession.h \
     activemq/core/ActiveMQSessionExecutor.h \
     activemq/core/ActiveMQTransactionContext.h \
@@ -1108,6 +1110,7 @@ h_sources = \
     cms/Message.h \
     cms/MessageConsumer.h \
     cms/MessageEOFException.h \
+    cms/MessageEnumeration.h \
     cms/MessageFormatException.h \
     cms/MessageListener.h \
     cms/MessageNotReadableException.h \

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp?rev=923787&r1=923786&r2=923787&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp Tue Mar 16 14:56:19 2010
@@ -191,7 +191,7 @@ void ActiveMQConnection::addProducer( Ac
 
         // Add this producer from the set of active consumer.
         synchronized( &activeProducers ) {
-            activeProducers.put( producer->getProducerInfo().getProducerId(), producer );
+            activeProducers.put( producer->getProducerInfo()->getProducerId(), producer );
         }
     }
     AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
@@ -461,14 +461,6 @@ void ActiveMQConnection::onCommand( cons
             // Check first to see if we are recovering.
             waitForTransportInterruptionProcessingToComplete();
 
-            // Check for an empty Message, shouldn't ever happen but who knows.
-            if( dispatch->getMessage() == NULL ) {
-                throw ActiveMQException(
-                    __FILE__, __LINE__,
-                    "ActiveMQConnection::onCommand - "
-                    "Received unsupported dispatch message" );
-            }
-
             // Look up the dispatcher.
             Dispatcher* dispatcher = NULL;
             synchronized( &dispatchers ) {
@@ -479,9 +471,14 @@ void ActiveMQConnection::onCommand( cons
                 // just closed.
                 if( dispatcher != NULL ) {
 
-                    dispatch->getMessage()->setReadOnlyBody( true );
-                    dispatch->getMessage()->setReadOnlyProperties( true );
-                    dispatch->getMessage()->setRedeliveryCounter( dispatch->getRedeliveryCounter() );
+                    Pointer<commands::Message> message = dispatch->getMessage();
+
+                    // Message == NULL to signal the end of a Queue Browse.
+                    if( message != NULL ) {
+                        message->setReadOnlyBody( true );
+                        message->setReadOnlyProperties( true );
+                        message->setRedeliveryCounter( dispatch->getRedeliveryCounter() );
+                    }
 
                     dispatcher->dispatch( dispatch );
                 }

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp?rev=923787&r1=923786&r2=923787&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp Tue Mar 16 14:56:19 2010
@@ -21,8 +21,12 @@
 #include <decaf/lang/exceptions/IllegalArgumentException.h>
 #include <decaf/lang/Math.h>
 #include <decaf/lang/System.h>
+#include <decaf/lang/Boolean.h>
+#include <decaf/lang/Integer.h>
+#include <decaf/lang/Long.h>
 #include <activemq/util/Config.h>
 #include <activemq/util/CMSExceptionSupport.h>
+#include <activemq/util/ActiveMQProperties.h>
 #include <activemq/exceptions/ActiveMQException.h>
 #include <activemq/commands/Message.h>
 #include <activemq/commands/MessageAck.h>
@@ -40,6 +44,7 @@
 
 using namespace std;
 using namespace activemq;
+using namespace activemq::util;
 using namespace activemq::core;
 using namespace activemq::commands;
 using namespace activemq::exceptions;
@@ -187,19 +192,50 @@ namespace core {
 }}
 
 ////////////////////////////////////////////////////////////////////////////////
-ActiveMQConsumer::ActiveMQConsumer( const Pointer<ConsumerInfo>& consumerInfo,
-                                    ActiveMQSession* session,
-                                    const Pointer<ActiveMQTransactionContext>& transaction ) {
+ActiveMQConsumer::ActiveMQConsumer( ActiveMQSession* session,
+                                    const Pointer<ConsumerId>& id,
+                                    const Pointer<ActiveMQDestination>& destination,
+                                    const std::string& name,
+                                    const std::string& selector,
+                                    int prefetch,
+                                    int maxPendingMessageCount,
+                                    bool noLocal,
+                                    bool browser,
+                                    bool dispatchAsync,
+                                    cms::MessageListener* listener ) {
 
-    if( session == NULL || consumerInfo == NULL ) {
+    if( session == NULL ) {
         throw ActiveMQException(
             __FILE__, __LINE__,
-            "ActiveMQConsumer::ActiveMQConsumer - Init with NULL Session");
+            "ActiveMQConsumer::ActiveMQConsumer - Init with NULL Session" );
     }
 
-    // Initialize Producer Data
+    if( destination == NULL ) {
+        throw ActiveMQException(
+            __FILE__, __LINE__,
+            "ActiveMQConsumer::ActiveMQConsumer - Init with NULL Destination" );
+    }
+
+    if( destination->getPhysicalName() == "" ) {
+        throw ActiveMQException(
+            __FILE__, __LINE__,
+            "ActiveMQConsumer::ActiveMQConsumer - Destination given has no Physical Name." );
+    }
+
+    Pointer<ConsumerInfo> consumerInfo( new ConsumerInfo() );
+
+    consumerInfo->setConsumerId( id );
+    consumerInfo->setDestination( destination );
+    consumerInfo->setSubscriptionName( name );
+    consumerInfo->setSelector( selector );
+    consumerInfo->setPrefetchSize( prefetch );
+    consumerInfo->setMaximumPendingMessageLimit( maxPendingMessageCount );
+    consumerInfo->setBrowser( browser );
+    consumerInfo->setDispatchAsync( dispatchAsync );
+    consumerInfo->setNoLocal( noLocal );
+
+    // Initialize Consumer Data
     this->session = session;
-    this->transaction = transaction;
     this->consumerInfo = consumerInfo;
     this->lastDeliveredSequenceId = -1;
     this->synchronizationRegistered = false;
@@ -208,6 +244,10 @@ ActiveMQConsumer::ActiveMQConsumer( cons
     this->deliveredCounter = 0;
     this->clearDispatchList = false;
     this->listener = NULL;
+
+    if( listener != NULL ) {
+        this->setMessageListener( listener );
+    }
 }
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -244,7 +284,8 @@ void ActiveMQConsumer::close()
 
     try{
         if( !this->isClosed() ) {
-            if( this->transaction != NULL && this->transaction->isInTransaction() ) {
+            if( this->session->getTransactionContext() != NULL &&
+                this->session->getTransactionContext()->isInTransaction() ) {
 
                 // TODO - Currently we can do this since the consumer could be
                 // deleted right after the close call so it won't stick around
@@ -313,7 +354,7 @@ void ActiveMQConsumer::doClose() throw (
             }
 
             // Remove this Consumer from the Connections set of Dispatchers
-            this->session->disposeOf( this->consumerInfo->getConsumerId(), lastDeliveredSequenceId );
+            this->session->removeConsumer( this->consumerInfo->getConsumerId(), lastDeliveredSequenceId );
 
             // Remove at the Broker Side, consumer has been removed from the local
             // Session and Connection objects so if the remote call to remove throws
@@ -689,7 +730,7 @@ void ActiveMQConsumer::ackLater( const P
             synchronizationRegistered = true;
 
             Pointer<Synchronization> sync( new TransactionSynhcronization( this ) );
-            this->transaction->addSynchronization( sync );
+            this->session->getTransactionContext()->addSynchronization( sync );
         }
     }
 
@@ -718,7 +759,7 @@ void ActiveMQConsumer::ackLater( const P
     }
 
     if( session->isTransacted() ) {
-        pendingAck->setTransactionId( this->transaction->getTransactionId() );
+        pendingAck->setTransactionId( this->session->getTransactionContext()->getTransactionId() );
     }
 
     if( ( 0.5 * this->consumerInfo->getPrefetchSize() ) <= ( deliveredCounter - additionalWindowSize ) ) {
@@ -809,7 +850,7 @@ void ActiveMQConsumer::acknowledge() thr
 
             if( session->isTransacted() ) {
                 session->doStartTransaction();
-                ack->setTransactionId( transaction->getTransactionId() );
+                ack->setTransactionId( session->getTransactionContext()->getTransactionId() );
             }
 
             session->oneway( ack );
@@ -850,7 +891,7 @@ void ActiveMQConsumer::rollback() throw(
             Pointer<MessageDispatch> lastMsg = dispatchedMessages.front();
             const int currentRedeliveryCount = lastMsg->getMessage()->getRedeliveryCounter();
             if( currentRedeliveryCount > 0 ) {
-                redeliveryDelay = transaction->getRedeliveryDelay();
+                redeliveryDelay = session->getTransactionContext()->getRedeliveryDelay();
             }
 
             Pointer<MessageId> firstMsgId =
@@ -863,7 +904,7 @@ void ActiveMQConsumer::rollback() throw(
                 message->setRedeliveryCounter( message->getRedeliveryCounter() + 1 );
             }
 
-            if( lastMsg->getRedeliveryCounter() > this->transaction->getMaximumRedeliveries() ) {
+            if( lastMsg->getRedeliveryCounter() > this->session->getTransactionContext()->getMaximumRedeliveries() ) {
 
                 // We need to NACK the messages so that they get sent to the DLQ.
                 // Acknowledge the last message.
@@ -1073,3 +1114,83 @@ bool ActiveMQConsumer::isAutoAcknowledge
 bool ActiveMQConsumer::isAutoAcknowledgeBatch() const {
     return this->session->isDupsOkAcknowledge() && !this->consumerInfo->getDestination()->isQueue();
 }
+
+////////////////////////////////////////////////////////////////////////////////
+int ActiveMQConsumer::getMessageAvailableCount() const {
+    return this->unconsumedMessages.size();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConsumer::applyDestinationOptions( const Pointer<ConsumerInfo>& info ) {
+
+    decaf::lang::Pointer<commands::ActiveMQDestination> amqDestination = info->getDestination();
+
+    // Get any options specified in the destination and apply them to the
+    // ConsumerInfo object.
+    const ActiveMQProperties& options = amqDestination->getOptions();
+
+    std::string noLocalStr =
+        core::ActiveMQConstants::toString( core::ActiveMQConstants::CONSUMER_NOLOCAL );
+    if( options.hasProperty( noLocalStr ) ) {
+        info->setNoLocal( Boolean::parseBoolean(
+            options.getProperty( noLocalStr ) ) );
+    }
+
+    std::string selectorStr =
+        core::ActiveMQConstants::toString( core::ActiveMQConstants::CONSUMER_SELECTOR );
+    if( options.hasProperty( selectorStr ) ) {
+        info->setSelector( options.getProperty( selectorStr ) );
+    }
+
+    std::string priorityStr =
+        core::ActiveMQConstants::toString( core::ActiveMQConstants::CONSUMER_PRIORITY );
+    if( options.hasProperty( priorityStr ) ) {
+        info->setPriority( (unsigned char)Integer::parseInt( options.getProperty( priorityStr ) ) );
+    }
+
+    std::string dispatchAsyncStr =
+        core::ActiveMQConstants::toString( core::ActiveMQConstants::CONSUMER_DISPATCHASYNC );
+    if( options.hasProperty( dispatchAsyncStr ) ) {
+        info->setDispatchAsync(
+            Boolean::parseBoolean( options.getProperty( dispatchAsyncStr ) ) );
+    }
+
+    std::string exclusiveStr =
+        core::ActiveMQConstants::toString( core::ActiveMQConstants::CONSUMER_EXCLUSIVE );
+    if( options.hasProperty( exclusiveStr ) ) {
+        info->setExclusive(
+            Boolean::parseBoolean( options.getProperty( exclusiveStr ) ) );
+    }
+
+    std::string maxPendingMsgLimitStr =
+        core::ActiveMQConstants::toString(
+            core::ActiveMQConstants::CUNSUMER_MAXPENDINGMSGLIMIT );
+
+    if( options.hasProperty( maxPendingMsgLimitStr ) ) {
+        info->setMaximumPendingMessageLimit(
+            Integer::parseInt(
+                options.getProperty( maxPendingMsgLimitStr ) ) );
+    }
+
+    std::string prefetchSizeStr =
+        core::ActiveMQConstants::toString( core::ActiveMQConstants::CONSUMER_PREFECTCHSIZE );
+    if( info->getPrefetchSize() <= 0 || options.hasProperty( prefetchSizeStr )  ) {
+        info->setPrefetchSize(
+            Integer::parseInt( options.getProperty( prefetchSizeStr, "1000" ) ) );
+    }
+
+    std::string retroactiveStr =
+        core::ActiveMQConstants::toString( core::ActiveMQConstants::CONSUMER_RETROACTIVE );
+    if( options.hasProperty( retroactiveStr ) ) {
+        info->setRetroactive(
+            Boolean::parseBoolean( options.getProperty( retroactiveStr ) ) );
+    }
+
+    std::string networkSubscriptionStr = "consumer.networkSubscription";
+
+    if( options.hasProperty( networkSubscriptionStr ) ) {
+        info->setNetworkSubscription(
+            Boolean::parseBoolean(
+                options.getProperty( networkSubscriptionStr ) ) );
+    }
+}

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.h?rev=923787&r1=923786&r2=923787&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.h Tue Mar 16 14:56:19 2010
@@ -27,7 +27,6 @@
 #include <activemq/commands/ConsumerInfo.h>
 #include <activemq/commands/MessageAck.h>
 #include <activemq/commands/MessageDispatch.h>
-#include <activemq/core/ActiveMQTransactionContext.h>
 #include <activemq/core/Dispatcher.h>
 #include <activemq/core/MessageDispatchChannel.h>
 
@@ -57,11 +56,6 @@ namespace core{
         ActiveMQSession* session;
 
         /**
-         * The Transaction Context, null if not in a Transacted Session.
-         */
-        Pointer<ActiveMQTransactionContext> transaction;
-
-        /**
          * The Consumer info for this Consumer
          */
         Pointer<commands::ConsumerInfo> consumerInfo;
@@ -136,9 +130,17 @@ namespace core{
         /**
          * Constructor
          */
-        ActiveMQConsumer( const Pointer<commands::ConsumerInfo>& consumerInfo,
-                          ActiveMQSession* session,
-                          const Pointer<ActiveMQTransactionContext>& transaction );
+        ActiveMQConsumer( ActiveMQSession* session,
+                          const Pointer<commands::ConsumerId>& id,
+                          const Pointer<commands::ActiveMQDestination>& destination,
+                          const std::string& name,
+                          const std::string& selector,
+                          int prefetch,
+                          int maxPendingMessageCount,
+                          bool noLocal,
+                          bool browser,
+                          bool dispatchAsync,
+                          cms::MessageListener* listener );
 
         virtual ~ActiveMQConsumer();
 
@@ -259,18 +261,18 @@ namespace core{
          * Get the Consumer information for this consumer
          * @return Reference to a Consumer Info Object
          */
-        const commands::ConsumerInfo& getConsumerInfo() const {
+        const Pointer<commands::ConsumerInfo>& getConsumerInfo() const {
             this->checkClosed();
-            return *( this->consumerInfo );
+            return this->consumerInfo;
         }
 
         /**
          * Get the Consumer Id for this consumer
          * @return Reference to a Consumer Id Object
          */
-        const commands::ConsumerId& getConsumerId() const {
+        const Pointer<commands::ConsumerId>& getConsumerId() const {
             this->checkClosed();
-            return *( this->consumerInfo->getConsumerId() );
+            return this->consumerInfo->getConsumerId();
         }
 
         /**
@@ -332,6 +334,11 @@ namespace core{
             this->lastDeliveredSequenceId = value;
         }
 
+        /**
+         * @returns the number of Message's this consumer is waiting to Dispatch.
+         */
+        int getMessageAvailableCount() const;
+
     protected:
 
         /**
@@ -367,15 +374,16 @@ namespace core{
 
     private:
 
-        /**
-         * If supported sends a message pull request to the service provider asking
-         * for the delivery of a new message.  This is used in the case where the
-         * service provider has been configured with a zero prefetch or is only
-         * capable of delivering messages on a pull basis.  No request is made if
-         * there are already messages in the unconsumed queue since there's no need
-         * for a server round-trip in that instance.
-         * @param timeout - the time that the client is willing to wait.
-         */
+        // Using options from the Destination URI override any settings that are
+        // defined for this consumer.
+        void applyDestinationOptions( const Pointer<commands::ConsumerInfo>& info );
+
+        // If supported sends a message pull request to the service provider asking
+        // for the delivery of a new message.  This is used in the case where the
+        // service provider has been configured with a zero prefetch or is only
+        // capable of delivering messages on a pull basis.  No request is made if
+        // there are already messages in the unconsumed queue since there's no need
+        // for a server round-trip in that instance.
         void sendPullRequest( long long timeout )
             throw ( exceptions::ActiveMQException );
 

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQProducer.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQProducer.cpp?rev=923787&r1=923786&r2=923787&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQProducer.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQProducer.cpp Tue Mar 16 14:56:19 2010
@@ -20,13 +20,16 @@
 #include <activemq/core/ActiveMQConnection.h>
 #include <activemq/commands/RemoveInfo.h>
 #include <activemq/util/CMSExceptionSupport.h>
+#include <activemq/util/ActiveMQProperties.h>
 #include <decaf/lang/exceptions/NullPointerException.h>
 #include <decaf/lang/exceptions/InvalidStateException.h>
 #include <decaf/lang/exceptions/IllegalArgumentException.h>
 #include <decaf/lang/System.h>
+#include <decaf/lang/Boolean.h>
 
 using namespace std;
 using namespace activemq;
+using namespace activemq::util;
 using namespace activemq::core;
 using namespace activemq::commands;
 using namespace activemq::exceptions;
@@ -35,20 +38,38 @@ using namespace decaf::lang;
 using namespace decaf::lang::exceptions;
 
 ////////////////////////////////////////////////////////////////////////////////
-ActiveMQProducer::ActiveMQProducer( const Pointer<commands::ProducerInfo>& producerInfo,
-                                    const Pointer<cms::Destination>& destination,
-                                    ActiveMQSession* session ) {
+ActiveMQProducer::ActiveMQProducer( ActiveMQSession* session,
+                                    const Pointer<commands::ProducerId>& producerId,
+                                    const Pointer<ActiveMQDestination>& destination,
+                                    long long sendTimeout ) {
 
-    if( session == NULL || producerInfo == NULL ) {
+    if( session == NULL || producerId == NULL ) {
         throw ActiveMQException(
             __FILE__, __LINE__,
             "ActiveMQProducer::ActiveMQProducer - Init with NULL Session" );
     }
 
+    this->producerInfo.reset( new ProducerInfo() );
+
+    this->producerInfo->setProducerId( producerId );
+    this->producerInfo->setDestination( destination );
+    this->producerInfo->setWindowSize( session->getConnection()->getProducerWindowSize() );
+
+    // Get any options specified in the destination and apply them to the
+    // ProducerInfo object.
+    if( destination != NULL ) {
+        const ActiveMQProperties& options = destination->getOptions();
+        this->producerInfo->setDispatchAsync( Boolean::parseBoolean(
+            options.getProperty( "producer.dispatchAsync", "false" )) );
+
+        this->destination = destination.dynamicCast<cms::Destination>();
+    }
+
+    // TODO - Check for need of MemoryUsage if there's a producer Windows size
+    //        and the Protocol version is greater than 3.
+
     // Init Producer Data
     this->session = session;
-    this->producerInfo = producerInfo;
-    this->destination = destination;
     this->closed = false;
 
     // Default the Delivery options
@@ -57,6 +78,7 @@ ActiveMQProducer::ActiveMQProducer( cons
     this->disableMessageId = false;
     this->defaultPriority = 4;
     this->defaultTimeToLive = 0;
+    this->sendTimeout = sendTimeout;
 }
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -75,7 +97,7 @@ void ActiveMQProducer::close() throw ( c
 
         if( !this->isClosed() ) {
 
-            this->session->disposeOf( this->producerInfo->getProducerId() );
+            this->session->removeProducer( this->producerInfo->getProducerId() );
             this->closed = true;
 
             // Remove at the Broker Side, if this fails the producer has already

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQProducer.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQProducer.h?rev=923787&r1=923786&r2=923787&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQProducer.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQProducer.h Tue Mar 16 14:56:19 2010
@@ -78,17 +78,20 @@ namespace core{
         /**
          * Constructor, creates an instance of an ActiveMQProducer
          *
-         * @param producerInfo
-         *        Pointer to a ProducerInfo command which identifies this producer.
+         * @param session
+         *        The Session which is the parent of this Producer.
+         * @param producerId
+         *        Pointer to a ProducerId object which identifies this producer.
          * @param destination
          *        The assigned Destination this Producer sends to, or null if not set.
          *        The Producer does not own the Pointer passed.
-         * @param session
-         *        The Session which is the parent of this Producer.
+         * @param sendTimeout
+         *        The configured send timeout for this Producer.
          */
-        ActiveMQProducer( const Pointer<commands::ProducerInfo>& producerInfo,
-                          const Pointer<cms::Destination>& destination,
-                          ActiveMQSession* session );
+        ActiveMQProducer( ActiveMQSession* session,
+                          const Pointer<commands::ProducerId>& producerId,
+                          const Pointer<commands::ActiveMQDestination>& destination,
+                          long long sendTimeout );
 
         virtual ~ActiveMQProducer();
 
@@ -308,18 +311,18 @@ namespace core{
          * Retries this object ProducerInfo pointer
          * @return ProducerInfo Reference
          */
-        const commands::ProducerInfo& getProducerInfo() const {
+        const Pointer<commands::ProducerInfo>& getProducerInfo() const {
             this->checkClosed();
-            return *( this->producerInfo );
+            return this->producerInfo;
         }
 
         /**
          * Retries this object ProducerId or NULL if closed.
          * @return ProducerId Reference
          */
-        commands::ProducerId& getProducerId() const {
+        const Pointer<commands::ProducerId>& getProducerId() const {
             this->checkClosed();
-            return *( this->producerInfo->getProducerId() );
+            return this->producerInfo->getProducerId();
         }
 
         /**

Added: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQQueueBrowser.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQQueueBrowser.cpp?rev=923787&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQQueueBrowser.cpp (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQQueueBrowser.cpp Tue Mar 16 14:56:19 2010
@@ -0,0 +1,295 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "ActiveMQQueueBrowser.h"
+
+#include <cms/MessageListener.h>
+#include <activemq/commands/ConsumerId.h>
+#include <activemq/commands/ActiveMQDestination.h>
+#include <activemq/core/ActiveMQConnection.h>
+#include <activemq/core/ActiveMQConsumer.h>
+#include <activemq/core/ActiveMQSession.h>
+#include <activemq/core/ActiveMQTransactionContext.h>
+#include <activemq/exceptions/ActiveMQException.h>
+#include <activemq/util/CMSExceptionSupport.h>
+
+#include <decaf/util/concurrent/atomic/AtomicBoolean.h>
+
+using namespace std;
+using namespace activemq;
+using namespace activemq::core;
+using namespace activemq::commands;
+using namespace activemq::exceptions;
+using namespace decaf;
+using namespace decaf::lang;
+using namespace decaf::util;
+using namespace decaf::util::concurrent;
+using namespace decaf::util::concurrent::atomic;
+
+////////////////////////////////////////////////////////////////////////////////
+namespace activemq{
+namespace core{
+
+    class Browser : public ActiveMQConsumer {
+    public:
+
+        ActiveMQQueueBrowser* parent;
+
+    public:
+
+        Browser( ActiveMQQueueBrowser* parent, ActiveMQSession* session,
+                 const Pointer<commands::ConsumerId>& id,
+                 const Pointer<commands::ActiveMQDestination>& destination,
+                 const std::string& name, const std::string& selector,
+                 int prefetch, int maxPendingMessageCount, bool noLocal,
+                 bool browser, bool dispatchAsync,
+                 cms::MessageListener* listener ) :
+            ActiveMQConsumer( session, id, destination, name, selector, prefetch,
+                              maxPendingMessageCount, noLocal, browser, dispatchAsync,
+                              listener ), parent( parent ) {
+
+        }
+
+        virtual void dispatch( const Pointer<MessageDispatch>& dispatched ) {
+
+            try{
+
+                if( dispatched->getMessage() == NULL ) {
+                    this->parent->browseDone.set( true );
+                } else {
+                    ActiveMQConsumer::dispatch( dispatched );
+                }
+
+                this->parent->notifyMessageAvailable();
+            }
+            AMQ_CATCH_RETHROW( ActiveMQException )
+            AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
+            AMQ_CATCHALL_THROW( ActiveMQException )
+        }
+
+    };
+}}
+
+////////////////////////////////////////////////////////////////////////////////
+ActiveMQQueueBrowser::ActiveMQQueueBrowser( ActiveMQSession* session,
+                                            const Pointer<commands::ConsumerId>& consumerId,
+                                            const Pointer<commands::ActiveMQDestination>& destination,
+                                            const std::string& selector,
+                                            bool dispatchAsync ) {
+
+    if( session == NULL ) {
+        throw ActiveMQException(
+            __FILE__, __LINE__, "Session instance provided was NULL." );
+    }
+
+    if( consumerId == NULL ) {
+        throw ActiveMQException(
+            __FILE__, __LINE__, "ConsumerId instance provided was NULL." );
+    }
+
+    if( destination == NULL || !destination->isQueue() ) {
+        throw ActiveMQException(
+            __FILE__, __LINE__, "Destination instance provided was NULL or not a Queue." );
+    }
+
+    // Cache the Queue instance for faster retreival.
+    this->queue = destination.dynamicCast<cms::Queue>().get();
+    this->consumerId = consumerId;
+    this->selector = selector;
+    this->dispatchAsync = dispatchAsync;
+    this->session = session;
+    this->destination = destination;
+    this->closed = false;
+    this->browser = NULL;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+ActiveMQQueueBrowser::~ActiveMQQueueBrowser() {
+    try{
+        this->close();
+    }
+    DECAF_CATCHALL_NOTHROW()
+}
+
+////////////////////////////////////////////////////////////////////////////////
+const cms::Queue* ActiveMQQueueBrowser::getQueue() const throw ( cms::CMSException ) {
+    return this->queue;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+std::string ActiveMQQueueBrowser::getMessageSelector() const throw ( cms::CMSException ) {
+    return this->selector;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+cms::MessageEnumeration* ActiveMQQueueBrowser::getEnumeration() throw ( cms::CMSException ) {
+
+    try{
+        checkClosed();
+        if( this->browser == NULL ) {
+            this->browser = createConsumer();
+        }
+        return this;
+    }
+    AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQQueueBrowser::close() throw( cms::CMSException ) {
+    try{
+
+        if( this->closed ) {
+            return;
+        }
+
+        synchronized( &mutex ) {
+            destroyConsumer();
+            this->closed = true;
+        }
+    }
+    AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool ActiveMQQueueBrowser::hasMoreMessages() {
+
+    try{
+
+        while( true ) {
+
+            synchronized( &mutex ) {
+                if( this->browser == NULL ) {
+                    return false;
+                }
+            }
+
+            if( this->browser->getMessageAvailableCount() > 0 ) {
+                return true;
+            }
+
+            if( browseDone.get() || !this->session->isStarted() ) {
+                destroyConsumer();
+                return false;
+            }
+
+            waitForMessageAvailable();
+        }
+    }
+    AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
+}
+
+////////////////////////////////////////////////////////////////////////////////
+cms::Message* ActiveMQQueueBrowser::nextMessage() throw( cms::CMSException ) {
+
+    try{
+
+        while( true ) {
+
+            synchronized( &mutex ) {
+                if( this->browser == NULL ) {
+                    return NULL;
+                }
+            }
+
+            try {
+
+                cms::Message* answer = this->browser->receiveNoWait();
+                if( answer != NULL ) {
+                    return answer;
+                }
+
+            } catch( cms::CMSException& e ) {
+                return NULL;
+            }
+
+            if( this->browseDone.get() || !this->session->isStarted() ) {
+                destroyConsumer();
+                return NULL;
+            }
+
+            waitForMessageAvailable();
+        }
+    }
+    AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQQueueBrowser::notifyMessageAvailable() {
+
+    synchronized( &wait ) {
+        wait.notifyAll();
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQQueueBrowser::waitForMessageAvailable() {
+    synchronized( &wait ) {
+        wait.wait( 2000 );
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+ActiveMQConsumer* ActiveMQQueueBrowser::createConsumer() {
+
+    this->browseDone.set( false );
+    // TODO - get config options from connection and prefetch policy.
+    std::auto_ptr<ActiveMQConsumer> consumer(
+        new Browser( this, session, consumerId, destination, "", selector,
+                     500, 0, false, true, false, NULL ) );
+
+    try{
+        this->session->addConsumer( consumer.get() );
+        this->session->syncRequest( consumer->getConsumerInfo() );
+    } catch( Exception& ex ) {
+        this->session->removeConsumer( consumer->getConsumerId() );
+        throw ex;
+    }
+
+    if( this->session->getConnection()->isStarted() ) {
+        consumer->start();
+    }
+
+    return consumer.release();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQQueueBrowser::destroyConsumer() {
+
+    if( this->browser == NULL ) {
+        return;
+    }
+
+    try {
+
+        if( this->session->isTransacted() ) {
+            session->commit();
+        }
+
+        this->browser->close();
+        delete this->browser;
+        this->browser = NULL ;
+    }
+    AMQ_CATCHALL_NOTHROW()
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQQueueBrowser::checkClosed() {
+    if( closed ) {
+        throw ActiveMQException(
+            __FILE__, __LINE__, "The QueueBrowser is closed." );
+    }
+}

Propchange: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQQueueBrowser.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQQueueBrowser.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQQueueBrowser.h?rev=923787&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQQueueBrowser.h (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQQueueBrowser.h Tue Mar 16 14:56:19 2010
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _ACTIVEMQ_CORE_ACTIVEMQQUEUEBROWSER_H_
+#define _ACTIVEMQ_CORE_ACTIVEMQQUEUEBROWSER_H_
+
+#include <activemq/util/Config.h>
+
+#include <cms/Queue.h>
+#include <cms/QueueBrowser.h>
+#include <cms/MessageEnumeration.h>
+#include <activemq/commands/ConsumerId.h>
+#include <activemq/commands/ActiveMQDestination.h>
+#include <decaf/lang/Pointer.h>
+#include <decaf/util/concurrent/Mutex.h>
+#include <decaf/util/concurrent/atomic/AtomicBoolean.h>
+
+#include <string>
+
+namespace activemq {
+namespace core {
+
+    class ActiveMQConsumer;
+    class ActiveMQSession;
+    class Browser;
+
+    using decaf::lang::Pointer;
+
+    class AMQCPP_API ActiveMQQueueBrowser : public cms::QueueBrowser,
+                                            public cms::MessageEnumeration {
+    private:
+
+        friend class Browser;
+
+        ActiveMQSession* session;
+        Pointer<commands::ConsumerId> consumerId;
+        Pointer<commands::ActiveMQDestination> destination;
+        std::string selector;
+        bool dispatchAsync;
+        cms::Queue* queue;
+        volatile bool closed;
+
+        mutable decaf::util::concurrent::Mutex mutex;
+        mutable decaf::util::concurrent::Mutex wait;
+        decaf::util::concurrent::atomic::AtomicBoolean browseDone;
+
+        mutable ActiveMQConsumer* browser;
+
+    public:
+
+        ActiveMQQueueBrowser( ActiveMQSession* session,
+                              const Pointer<commands::ConsumerId>& consumerId,
+                              const Pointer<commands::ActiveMQDestination>& destination,
+                              const std::string& selector,
+                              bool dispatchAsync );
+
+        virtual ~ActiveMQQueueBrowser();
+
+    public:
+
+        virtual const cms::Queue* getQueue() const throw ( cms::CMSException );
+
+        virtual std::string getMessageSelector() const throw ( cms::CMSException );
+
+        virtual cms::MessageEnumeration* getEnumeration() throw ( cms::CMSException );
+
+        virtual void close() throw( cms::CMSException );
+
+        virtual bool hasMoreMessages();
+
+        virtual cms::Message* nextMessage() throw( cms::CMSException );
+
+    private:
+
+        void checkClosed();
+        void notifyMessageAvailable();
+        void waitForMessageAvailable();
+
+        ActiveMQConsumer* createConsumer();
+        void destroyConsumer();
+
+    };
+
+}}
+
+#endif /* _ACTIVEMQ_CORE_ACTIVEMQQUEUEBROWSER_H_ */

Propchange: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQQueueBrowser.h
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp?rev=923787&r1=923786&r2=923787&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp Tue Mar 16 14:56:19 2010
@@ -22,6 +22,7 @@
 #include <activemq/core/ActiveMQTransactionContext.h>
 #include <activemq/core/ActiveMQConsumer.h>
 #include <activemq/core/ActiveMQProducer.h>
+#include <activemq/core/ActiveMQQueueBrowser.h>
 #include <activemq/core/ActiveMQSessionExecutor.h>
 #include <activemq/util/ActiveMQProperties.h>
 #include <activemq/util/CMSExceptionSupport.h>
@@ -303,31 +304,33 @@ cms::MessageConsumer* ActiveMQSession::c
 
         this->checkClosed();
 
-        Pointer<ConsumerInfo> consumerInfo( createConsumerInfo( destination ) );
-
-        consumerInfo->setSelector( selector );
-        consumerInfo->setNoLocal( noLocal );
+        // Cast the destination to an OpenWire destination, so we can
+        // get all the goodies.
+        const ActiveMQDestination* amqDestination =
+            dynamic_cast<const ActiveMQDestination*>( destination );
 
-        // Override default options with uri-encoded parameters.
-        this->applyDestinationOptions( consumerInfo );
+        if( amqDestination == NULL ) {
+            throw ActiveMQException(
+                __FILE__, __LINE__,
+                "Destination was either NULL or not created by this CMS Client" );
+        }
 
-        // Register this as a message dispatcher for the consumer since we
-        // could start receiving messages from the broker right away once we
-        // send the ConsumerInfo command.
-        this->connection->addDispatcher( consumerInfo->getConsumerId(), this );
+        Pointer<ActiveMQDestination> dest( amqDestination->cloneDataStructure() );
 
         // Create the consumer instance.
         std::auto_ptr<ActiveMQConsumer> consumer(
-            new ActiveMQConsumer( consumerInfo, this, this->transaction ) );
-
-        // Add the consumer to the map.
-        synchronized( &this->consumers ) {
-            this->consumers.put( consumer->getConsumerInfo().getConsumerId(), consumer.get() );
+            new ActiveMQConsumer( this, this->getNextConsumerId(),
+                                  dest, "", selector, 1000, 0, noLocal,
+                                  false, false, NULL ) );
+
+        try{
+            this->addConsumer( consumer.get() );
+            this->connection->syncRequest( consumer->getConsumerInfo() );
+        } catch( Exception& ex ) {
+            this->removeConsumer( consumer->getConsumerId() );
+            throw ex;
         }
 
-        // Send our info to the Broker.
-        this->syncRequest( consumerInfo );
-
         if( this->connection->isStarted() ) {
             consumer->start();
         }
@@ -349,32 +352,33 @@ cms::MessageConsumer* ActiveMQSession::c
 
         this->checkClosed();
 
-        Pointer<ConsumerInfo> consumerInfo( createConsumerInfo( destination ) );
+        // Cast the destination to an OpenWire destination, so we can
+        // get all the goodies.
+        const ActiveMQDestination* amqDestination =
+            dynamic_cast<const ActiveMQDestination*>( destination );
+
+        if( amqDestination == NULL ) {
+            throw ActiveMQException(
+                __FILE__, __LINE__,
+                "Destination was either NULL or not created by this CMS Client" );
+        }
 
-        consumerInfo->setSelector( selector );
-        consumerInfo->setNoLocal( noLocal );
-        consumerInfo->setSubscriptionName( name );
-
-        // Override default options with uri-encoded parameters.
-        this->applyDestinationOptions( consumerInfo );
-
-        // Register this as a message dispatcher for the consumer since we
-        // could start receiving messages from the broker right away once we
-        // send the ConsumerInfo command.
-        this->connection->addDispatcher( consumerInfo->getConsumerId(), this );
+        Pointer<ActiveMQDestination> dest( amqDestination->cloneDataStructure() );
 
         // Create the consumer instance.
         std::auto_ptr<ActiveMQConsumer> consumer(
-            new ActiveMQConsumer( consumerInfo, this, this->transaction ) );
-
-        // Add the consumer to the map.
-        synchronized( &this->consumers ) {
-            this->consumers.put( consumer->getConsumerInfo().getConsumerId(), consumer.get() );
+            new ActiveMQConsumer( this, this->getNextConsumerId(),
+                                  dest, name, selector, 1000, 0, noLocal,
+                                  false, false, NULL ) );
+
+        try{
+            this->addConsumer( consumer.get() );
+            this->connection->syncRequest( consumer->getConsumerInfo() );
+        } catch( Exception& ex ) {
+            this->removeConsumer( consumer->getConsumerId() );
+            throw ex;
         }
 
-        // Send our info to the Broker.
-        this->syncRequest( consumerInfo );
-
         if( this->connection->isStarted() ) {
             consumer->start();
         }
@@ -393,85 +397,80 @@ cms::MessageProducer* ActiveMQSession::c
 
         this->checkClosed();
 
-        Pointer<cms::Destination> clonedDestination(
-            destination != NULL ? destination->clone() : NULL );
-
-        decaf::lang::Pointer<commands::ProducerId> producerId( new commands::ProducerId() );
-        producerId->setConnectionId( this->sessionInfo->getSessionId()->getConnectionId() );
-        producerId->setSessionId( this->sessionInfo->getSessionId()->getValue() );
-        producerId->setValue( this->getNextProducerId() );
-
-        Pointer<commands::ProducerInfo> producerInfo( new commands::ProducerInfo() );
-        producerInfo->setProducerId( producerId );
-        producerInfo->setWindowSize( this->connection->getProducerWindowSize() );
+        Pointer<commands::ActiveMQDestination> dest;
 
         // Producers are allowed to have NULL destinations.  In this case, the
         // destination is specified by the messages as they are sent.
-        if( clonedDestination != NULL ) {
+        if( destination != NULL ) {
+
+            const ActiveMQDestination* amqDestination =
+                dynamic_cast<const ActiveMQDestination*>( destination );
+
+            if( amqDestination == NULL ) {
+                throw ActiveMQException(
+                    __FILE__, __LINE__,
+                    "Destination was either NULL or not created by this CMS Client" );
+            }
 
             // Cast the destination to an OpenWire destination, so we can
             // get all the goodies.
-            Pointer<commands::ActiveMQDestination> amqDestination =
-                clonedDestination.dynamicCast<commands::ActiveMQDestination>();
-
-            // Get any options specified in the destination and apply them to the
-            // ProducerInfo object.
-            producerInfo->setDestination( amqDestination );
-            const ActiveMQProperties& options = amqDestination->getOptions();
-            producerInfo->setDispatchAsync( Boolean::parseBoolean(
-                options.getProperty( "producer.dispatchAsync", "false" )) );
+            dest.reset( amqDestination->cloneDataStructure() );
         }
 
         // Create the producer instance.
-        std::auto_ptr<ActiveMQProducer> producer(
-            new ActiveMQProducer( producerInfo, clonedDestination, this ) );
+        std::auto_ptr<ActiveMQProducer> producer( new ActiveMQProducer(
+            this, this->getNextProducerId(), dest, this->connection->getSendTimeout() ) );
 
-        producer->setSendTimeout( this->connection->getSendTimeout() );
-
-        synchronized( &this->producers ) {
-            // Place the Producer into the Map.
-            this->producers.put( producerId, producer.get() );
+        try{
+            this->addProducer( producer.get() );
+            this->connection->oneway( producer->getProducerInfo() );
+        } catch( Exception& ex ) {
+            this->removeProducer( producer->getProducerId() );
+            throw ex;
         }
 
-        // Add to the Connections list
-        this->connection->addProducer( producer.get() );
-
-        this->syncRequest( producerInfo );
-
         return producer.release();
     }
     AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-cms::QueueBrowser* ActiveMQSession::createBrowser( const cms::Queue* queue AMQCPP_UNUSED )
+cms::QueueBrowser* ActiveMQSession::createBrowser( const cms::Queue* queue )
     throw( cms::CMSException ) {
 
     try{
-
-        throw UnsupportedOperationException(
-            __FILE__, __LINE__,
-            "createBrowser Method is not yet supported." );
-
-        // Fix for not so intelligent Sun Compiler
-        return NULL;
+        return ActiveMQSession::createBrowser( queue, "" );
     }
     AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-cms::QueueBrowser* ActiveMQSession::createBrowser( const cms::Queue* queue AMQCPP_UNUSED,
-                                                   const std::string& selector AMQCPP_UNUSED )
+cms::QueueBrowser* ActiveMQSession::createBrowser( const cms::Queue* queue,
+                                                   const std::string& selector )
     throw( cms::CMSException ) {
 
     try{
 
-        throw UnsupportedOperationException(
-            __FILE__, __LINE__,
-            "createBrowser Method is not yet supported." );
+        this->checkClosed();
+
+        // Cast the destination to an OpenWire destination, so we can
+        // get all the goodies.
+        const ActiveMQDestination* amqDestination =
+            dynamic_cast<const ActiveMQDestination*>( queue );
 
-        // Fix for not so intelligent Sun Compiler
-        return NULL;
+        if( amqDestination == NULL ) {
+            throw ActiveMQException(
+                __FILE__, __LINE__,
+                "Destination was either NULL or not created by this CMS Client" );
+        }
+
+        Pointer<ActiveMQDestination> dest( amqDestination->cloneDataStructure() );
+
+        // Create the QueueBrowser instance
+        std::auto_ptr<ActiveMQQueueBrowser> browser(
+            new ActiveMQQueueBrowser( this, this->getNextConsumerId(), dest, selector, false ) );
+
+        return browser.release();
     }
     AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
 }
@@ -687,7 +686,7 @@ void ActiveMQSession::send(
         // Always assign the message ID, regardless of the disable
         // flag.  Not adding a message ID will cause an NPE at the broker.
         decaf::lang::Pointer<commands::MessageId> id( new commands::MessageId() );
-        id->setProducerId( producer->getProducerInfo().getProducerId() );
+        id->setProducerId( producer->getProducerInfo()->getProducerId() );
         id->setProducerSequenceId( this->getNextProducerSequenceId() );
 
         amqMessage->setMessageId( id );
@@ -718,7 +717,7 @@ void ActiveMQSession::send(
         Pointer<commands::Message> msgCopy( amqMessage->cloneDataStructure() );
 
         msgCopy->onSend();
-        msgCopy->setProducerId( producer->getProducerInfo().getProducerId() );
+        msgCopy->setProducerId( producer->getProducerInfo()->getProducerId() );
 
         if( this->connection->getSendTimeout() <= 0 &&
             !msgCopy->isResponseRequired() &&
@@ -827,143 +826,6 @@ bool ActiveMQSession::isStarted() const 
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-ConsumerInfo* ActiveMQSession::createConsumerInfo(
-    const cms::Destination* destination ) throw ( activemq::exceptions::ActiveMQException ) {
-
-    try{
-
-        this->checkClosed();
-
-        std::auto_ptr<ConsumerInfo> consumerInfo( new commands::ConsumerInfo() );
-        decaf::lang::Pointer<ConsumerId> consumerId( new commands::ConsumerId() );
-
-        consumerId->setConnectionId(
-            this->connection->getConnectionId().getValue() );
-        consumerId->setSessionId( this->sessionInfo->getSessionId()->getValue() );
-        consumerId->setValue( this->getNextConsumerId() );
-
-        consumerInfo->setConsumerId( consumerId );
-
-        // Cast the destination to an OpenWire destination, so we can
-        // get all the goodies.
-        const ActiveMQDestination* amqDestination =
-            dynamic_cast<const ActiveMQDestination*>( destination );
-
-        if( amqDestination == NULL ) {
-            throw activemq::exceptions::ActiveMQException( __FILE__, __LINE__,
-                "Destination was either NULL or not created by this OpenWireConnector" );
-        }
-
-        consumerInfo->setDestination(
-            Pointer<ActiveMQDestination>( amqDestination->cloneDataStructure() ) );
-
-        return consumerInfo.release();
-    }
-    AMQ_CATCH_RETHROW( activemq::exceptions::ActiveMQException )
-    AMQ_CATCH_EXCEPTION_CONVERT( Exception, activemq::exceptions::ActiveMQException )
-    AMQ_CATCHALL_THROW( activemq::exceptions::ActiveMQException )
-}
-
-////////////////////////////////////////////////////////////////////////////////
-void ActiveMQSession::applyDestinationOptions( const Pointer<ConsumerInfo>& info ) {
-
-    decaf::lang::Pointer<commands::ActiveMQDestination> amqDestination = info->getDestination();
-
-    // Get any options specified in the destination and apply them to the
-    // ConsumerInfo object.
-    const ActiveMQProperties& options = amqDestination->getOptions();
-
-    std::string noLocalStr =
-        core::ActiveMQConstants::toString( core::ActiveMQConstants::CONSUMER_NOLOCAL );
-    if( options.hasProperty( noLocalStr ) ) {
-        info->setNoLocal( Boolean::parseBoolean(
-            options.getProperty( noLocalStr ) ) );
-    }
-
-    std::string selectorStr =
-        core::ActiveMQConstants::toString( core::ActiveMQConstants::CONSUMER_SELECTOR );
-    if( options.hasProperty( selectorStr ) ) {
-        info->setSelector( options.getProperty( selectorStr ) );
-    }
-
-    std::string priorityStr =
-        core::ActiveMQConstants::toString( core::ActiveMQConstants::CONSUMER_PRIORITY );
-    if( options.hasProperty( priorityStr ) ) {
-        info->setPriority( (unsigned char)Integer::parseInt( options.getProperty( priorityStr ) ) );
-    }
-
-    std::string dispatchAsyncStr =
-        core::ActiveMQConstants::toString( core::ActiveMQConstants::CONSUMER_DISPATCHASYNC );
-    if( options.hasProperty( dispatchAsyncStr ) ) {
-        info->setDispatchAsync(
-            Boolean::parseBoolean( options.getProperty( dispatchAsyncStr ) ) );
-    }
-
-    std::string exclusiveStr =
-        core::ActiveMQConstants::toString( core::ActiveMQConstants::CONSUMER_EXCLUSIVE );
-    if( options.hasProperty( exclusiveStr ) ) {
-        info->setExclusive(
-            Boolean::parseBoolean( options.getProperty( exclusiveStr ) ) );
-    }
-
-    std::string maxPendingMsgLimitStr =
-        core::ActiveMQConstants::toString(
-            core::ActiveMQConstants::CUNSUMER_MAXPENDINGMSGLIMIT );
-
-    if( options.hasProperty( maxPendingMsgLimitStr ) ) {
-        info->setMaximumPendingMessageLimit(
-            Integer::parseInt(
-                options.getProperty( maxPendingMsgLimitStr ) ) );
-    }
-
-    std::string prefetchSizeStr =
-        core::ActiveMQConstants::toString( core::ActiveMQConstants::CONSUMER_PREFECTCHSIZE );
-    if( info->getPrefetchSize() <= 0 || options.hasProperty( prefetchSizeStr )  ) {
-        info->setPrefetchSize(
-            Integer::parseInt( options.getProperty( prefetchSizeStr, "1000" ) ) );
-    }
-
-    std::string retroactiveStr =
-        core::ActiveMQConstants::toString( core::ActiveMQConstants::CONSUMER_RETROACTIVE );
-    if( options.hasProperty( retroactiveStr ) ) {
-        info->setRetroactive(
-            Boolean::parseBoolean( options.getProperty( retroactiveStr ) ) );
-    }
-
-    std::string browserStr = "consumer.browser";
-
-    if( options.hasProperty( browserStr ) ) {
-        info->setBrowser(
-            Boolean::parseBoolean(
-                options.getProperty( browserStr ) ) );
-    }
-
-    std::string networkSubscriptionStr = "consumer.networkSubscription";
-
-    if( options.hasProperty( networkSubscriptionStr ) ) {
-        info->setNetworkSubscription(
-            Boolean::parseBoolean(
-                options.getProperty( networkSubscriptionStr ) ) );
-    }
-
-    std::string optimizedAcknowledgeStr = "consumer.optimizedAcknowledge";
-
-    if( options.hasProperty( optimizedAcknowledgeStr ) ) {
-        info->setOptimizedAcknowledge(
-            Boolean::parseBoolean(
-                options.getProperty( optimizedAcknowledgeStr ) ) );
-    }
-
-    std::string noRangeAcksStr = "consumer.noRangeAcks";
-
-    if( options.hasProperty( noRangeAcksStr ) ) {
-        info->setNoRangeAcks(
-            Boolean::parseBoolean(
-                options.getProperty( noRangeAcksStr ) ) );
-    }
-}
-
-////////////////////////////////////////////////////////////////////////////////
 void ActiveMQSession::createTemporaryDestination(
     commands::ActiveMQTempDestination* tempDestination )
         throw ( activemq::exceptions::ActiveMQException ) {
@@ -1058,7 +920,28 @@ void ActiveMQSession::checkClosed() cons
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ActiveMQSession::disposeOf( Pointer<ConsumerId> id, long long lastDeliveredSequenceId )
+void ActiveMQSession::addConsumer( ActiveMQConsumer* consumer )
+    throw ( activemq::exceptions::ActiveMQException ) {
+
+    try{
+
+        this->checkClosed();
+
+        // Add the consumer to the map.
+        synchronized( &this->consumers ) {
+            this->consumers.put( consumer->getConsumerInfo()->getConsumerId(), consumer );
+        }
+
+        // Register this as a message dispatcher for the consumer.
+        this->connection->addDispatcher( consumer->getConsumerInfo()->getConsumerId(), this );
+    }
+    AMQ_CATCH_RETHROW( activemq::exceptions::ActiveMQException )
+    AMQ_CATCH_EXCEPTION_CONVERT( Exception, activemq::exceptions::ActiveMQException )
+    AMQ_CATCHALL_THROW( activemq::exceptions::ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQSession::removeConsumer( const Pointer<ConsumerId>& consumerId, long long lastDeliveredSequenceId )
     throw ( activemq::exceptions::ActiveMQException ) {
 
     try{
@@ -1067,12 +950,12 @@ void ActiveMQSession::disposeOf( Pointer
 
         synchronized( &this->consumers ) {
 
-            if( this->consumers.containsKey( id ) ) {
+            if( this->consumers.containsKey( consumerId ) ) {
 
                 // Remove this Id both from the Sessions Map of Consumers and from
                 // the Connection.
-                this->connection->removeDispatcher( id );
-                this->consumers.remove( id );
+                this->connection->removeDispatcher( consumerId );
+                this->consumers.remove( consumerId );
                 this->lastDeliveredSequenceId =
                     Math::max( this->lastDeliveredSequenceId, lastDeliveredSequenceId );
             }
@@ -1084,7 +967,28 @@ void ActiveMQSession::disposeOf( Pointer
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ActiveMQSession::disposeOf( Pointer<ProducerId> id )
+void ActiveMQSession::addProducer( ActiveMQProducer* producer )
+    throw ( activemq::exceptions::ActiveMQException ) {
+
+    try{
+
+        this->checkClosed();
+
+        synchronized( &this->producers ) {
+            // Place the Producer into the Map.
+            this->producers.put( producer->getProducerInfo()->getProducerId(), producer );
+        }
+
+        // Add to the Connections list
+        this->connection->addProducer( producer );
+    }
+    AMQ_CATCH_RETHROW( activemq::exceptions::ActiveMQException )
+    AMQ_CATCH_EXCEPTION_CONVERT( Exception, activemq::exceptions::ActiveMQException )
+    AMQ_CATCHALL_THROW( activemq::exceptions::ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQSession::removeProducer( const Pointer<ProducerId>& producerId )
     throw ( activemq::exceptions::ActiveMQException ) {
 
     try{
@@ -1093,10 +997,10 @@ void ActiveMQSession::disposeOf( Pointer
 
         synchronized( &this->producers ) {
 
-            if( this->producers.containsKey( id ) ) {
+            if( this->producers.containsKey( producerId ) ) {
 
-                this->connection->removeProducer( id );
-                this->producers.remove( id );
+                this->connection->removeProducer( producerId );
+                this->producers.remove( producerId );
             }
         }
     }
@@ -1122,3 +1026,27 @@ void ActiveMQSession::wakeup() {
         this->executor->wakeup();
     }
 }
+
+////////////////////////////////////////////////////////////////////////////////
+Pointer<commands::ConsumerId> ActiveMQSession::getNextConsumerId() {
+    Pointer<ConsumerId> consumerId( new commands::ConsumerId() );
+
+    consumerId->setConnectionId(
+        this->connection->getConnectionId().getValue() );
+    consumerId->setSessionId( this->sessionInfo->getSessionId()->getValue() );
+    consumerId->setValue( this->consumerIds.getNextSequenceId() );
+
+    return consumerId;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+Pointer<commands::ProducerId> ActiveMQSession::getNextProducerId() {
+    Pointer<ProducerId> producerId( new ProducerId() );
+
+    producerId->setConnectionId(
+        this->connection->getConnectionId().getValue() );
+    producerId->setSessionId( this->sessionInfo->getSessionId()->getValue() );
+    producerId->setValue( this->producerIds.getNextSequenceId() );
+
+    return producerId;
+}

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.h?rev=923787&r1=923786&r2=923787&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.h Tue Mar 16 14:56:19 2010
@@ -34,6 +34,7 @@
 #include <activemq/core/MessageDispatchChannel.h>
 #include <activemq/util/LongSequenceGenerator.h>
 
+#include <decaf/lang/Pointer.h>
 #include <decaf/util/StlMap.h>
 #include <decaf/util/StlQueue.h>
 #include <decaf/util/Properties.h>
@@ -547,28 +548,55 @@ namespace core{
             throw ( activemq::exceptions::ActiveMQException );
 
         /**
-         * Dispose of a Consumer from this session.  Removes it from the Connection
-         * and clean up any resources associated with it.
+         * Adds a MessageConsumer to this session registering it with the Connection and store
+         * a reference to it so the session can ensure that all resources are closed when
+         * the session is closed.
+         *
+         * @param consumer
+         *      The ActiveMQConsumer instance to add to this session.
          *
-         * @param id
-         *      The Id of the Consumer to dispose.
+         * @throw ActiveMQException if an internal error occurs.
+         */
+        void addConsumer( ActiveMQConsumer* consumer )
+            throw ( activemq::exceptions::ActiveMQException );
+
+        /**
+         * Dispose of a MessageConsumer from this session.  Removes it from the Connection
+         * and clean up any resources associated with it.
          *
+         * @param consumerId
+         *      The ConsumerId of the MessageConsumer to remove from this Session.
          * @param lastDeliveredSequenceId
-         *      The Broker Sequence Id of the last message the Consumer delivered.
+         *      The sequenceId of the last Message the consumer delivered.
+         *
+         * @throw ActiveMQException if an internal error occurs.
+         */
+        void removeConsumer( const Pointer<commands::ConsumerId>& consumerId, long long lastDeliveredSequenceId = 0 )
+            throw ( activemq::exceptions::ActiveMQException );
+
+        /**
+         * Adds a MessageProducer to this session registering it with the Connection and store
+         * a reference to it so the session can ensure that all resources are closed when
+         * the session is closed.
+         *
+         * @param consumer
+         *      The ActiveMQProducer instance to add to this session.
          *
          * @throw ActiveMQException if an internal error occurs.
          */
-        void disposeOf( decaf::lang::Pointer<commands::ConsumerId> id, long long lastDeliveredSequenceId )
+        void addProducer( ActiveMQProducer* consumer )
             throw ( activemq::exceptions::ActiveMQException );
 
         /**
-         * Dispose of a Producer from this session.  Removes it from the Connection
+         * Dispose of a MessageProducer from this session.  Removes it from the Connection
          * and clean up any resources associated with it.
          *
-         * @param id - the Id of the Producer to dispose.
+         * @param producerId
+         *      The ProducerId of the MessageProducer to remove from this session.
+         *
          * @throw ActiveMQException if an internal error occurs.
          */
-        void disposeOf( decaf::lang::Pointer<commands::ProducerId> id )
+        void removeProducer( const Pointer<commands::ProducerId>& producerId )
             throw ( activemq::exceptions::ActiveMQException );
 
         /**
@@ -581,6 +609,15 @@ namespace core{
         void doStartTransaction() throw ( exceptions::ActiveMQException );
 
         /**
+         * Gets the Pointer to this Session's TransactionContext
+         *
+         * @return a Pointer to this Session's TransactionContext
+         */
+        Pointer<ActiveMQTransactionContext> getTransactionContext() {
+            return this->transaction;
+        }
+
+        /**
          * Request that the Session inform all its consumers to Acknowledge all Message's
          * that have been received so far.
          */
@@ -603,15 +640,19 @@ namespace core{
          */
         void wakeup();
 
-   private:
+        /**
+         * Get the Next available Consumer Id
+         * @return the next id in the sequence.
+         */
+        Pointer<commands::ConsumerId> getNextConsumerId();
 
-       /**
-        * Get the Next available Producer Id
-        * @return the next id in the sequence.
-        */
-       long long getNextProducerId() {
-           return this->producerIds.getNextSequenceId();
-       }
+        /**
+         * Get the Next available Producer Id
+         * @return the next id in the sequence.
+         */
+        Pointer<commands::ProducerId> getNextProducerId();
+
+   private:
 
        /**
         * Get the Next available Producer Sequence Id
@@ -621,28 +662,9 @@ namespace core{
            return this->producerSequenceIds.getNextSequenceId();
        }
 
-       /**
-        * Get the Next available Consumer Id
-        * @return the next id in the sequence.
-        */
-       long long getNextConsumerId() {
-           return this->consumerIds.getNextSequenceId();
-       }
-
        // Checks for the closed state and throws if so.
        void checkClosed() const throw( exceptions::ActiveMQException );
 
-       // Performs the work of creating and configuring a valid Consumer Info, this
-       // can be used both by the normal createConsumer call and by a createDurableConsumer
-       // call as well.  Caller owns the returned ConsumerInfo object.
-       commands::ConsumerInfo* createConsumerInfo(
-           const cms::Destination* destination )
-               throw ( activemq::exceptions::ActiveMQException );
-
-       // Using options from the Destination URI override any settings that are
-       // defined for this consumer.
-       void applyDestinationOptions( const Pointer<commands::ConsumerInfo>& info );
-
        // Send the Destination Creation Request to the Broker, alerting it
        // that we've created a new Temporary Destination.
        // @param tempDestination - The new Temporary Destination

Added: activemq/activemq-cpp/trunk/activemq-cpp/src/main/cms/MessageEnumeration.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/cms/MessageEnumeration.h?rev=923787&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/cms/MessageEnumeration.h (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/cms/MessageEnumeration.h Tue Mar 16 14:56:19 2010
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _CMS_MESSAGEENUMERATION_H_
+#define _CMS_MESSAGEENUMERATION_H_
+
+#include <cms/Config.h>
+#include <cms/Message.h>
+#include <cms/CMSException.h>
+
+namespace cms{
+
+    /**
+     * Defines an object that enumerates a collection of Messages.  The client calls
+     * the hasMoreMessages method to determine if a Message is available.  If a Message
+     * is available the client calls the nextMessage method to retrieve that Message,
+     * calling nextMessage when a Message is not available results in an exception.
+     *
+     * @since 2.1
+     */
+    class CMS_API MessageEnumeration {
+    public:
+
+        /**
+         * Returns true if there are more Message in the Browser that can be retrieved
+         * via the <code>nextMessage</code> method.  If this method returns false and the
+         * <code>nextMessage</code> method is called then an Exception will be thrown.
+         *
+         * @return true if more Message's are available in the Browser.
+         */
+        virtual bool hasMoreMessages() = 0;
+
+        /**
+         * Returns the Next Message in the Queue if one is present, if no more Message's are
+         * available then an Exception is thrown.  If a Message object pointer is returned then
+         * that object becomes the property of the caller and must be deleted by the caller
+         * when finished.
+         *
+         * @return The next Message in the Queue.
+         *
+         * @throws CMSException if no more Message's currently in the Queue.
+         */
+        virtual cms::Message* nextMessage() throw( cms::CMSException ) = 0;
+
+    };
+}
+
+#endif /* _CMS_MESSAGEENUMERATION_H_ */

Propchange: activemq/activemq-cpp/trunk/activemq-cpp/src/main/cms/MessageEnumeration.h
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/cms/QueueBrowser.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/cms/QueueBrowser.h?rev=923787&r1=923786&r2=923787&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/cms/QueueBrowser.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/cms/QueueBrowser.h Tue Mar 16 14:56:19 2010
@@ -18,24 +18,35 @@
 #ifndef _CMS_QUEUEBROWSER_H_
 #define _CMS_QUEUEBROWSER_H_
 
-#include <vector>
 #include <string>
 #include <cms/Config.h>
 #include <cms/Closeable.h>
 #include <cms/Queue.h>
 #include <cms/Message.h>
 #include <cms/CMSException.h>
+#include <cms/MessageEnumeration.h>
 
 namespace cms{
 
     /**
      * This class implements in interface for browsing the messages in a Queue
      * without removing them.
-     * <p>
-     * The <code>getEnumeration</code> method of this class returns a static snapshot of
-     * the Queue at the time the method is called.  Since new Message's can be arriving and
-     * old Message's could expire the client should periodically refresh its view by calling
-     * <code>getEnumeration</code> again.
+     *
+     * To browse the contents of the Queue the client calls the <code>getEnumeration</code>
+     * method to retrieve a new instance of a Queue Enumerator.  The client then calls the
+     * hasMoreMessages method of the Enumeration, if it returns true the client can safely
+     * call the nextMessage method of the Enumeration instance.
+     *
+     *      Enumeration* enumeration = queueBrowser->getEnumeration();
+     *
+     *      while( enumeration->hasMoreMessages() ) {
+     *          cms::Message* message = enumeration->nextMessage();
+     *
+     *          // ... Do something with the Message.
+     *
+     *          delete message;
+     *      }
+     *
      *
      * @since 1.1
      */
@@ -60,17 +71,16 @@ namespace cms{
         virtual std::string getMessageSelector() const throw ( cms::CMSException ) = 0;
 
         /**
-         * Gets an enumeration for browsing the current queue messages in the
-         * order they would be received.  The enumeration returned is a static view
-         * of the Queue and is not updated as new Messages arrive, the client should
-         * refresh its enumeration by calling this method again.
+         * Gets a pointer to an Enumeration object for browsing the Messages currently in
+         * the Queue in the order that a client would receive them.  The pointer returned is
+         * owned by the browser and should not be deleted by the client application.
          *
-         * @returns an STL vector for browsing the messages.
+         * @returns a pointer to a Queue Enumeration, this Pointer is owned by the QueueBrowser
+         *          and should not be deleted by the client.
          *
          * @throws CMSException if an internal error occurs.
          */
-        virtual std::vector<const cms::Message*> getEnumeration() const
-            throw ( cms::CMSException ) = 0;
+        virtual cms::MessageEnumeration* getEnumeration() throw( cms::CMSException ) = 0;
 
     };
 

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/Makefile.am
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/Makefile.am?rev=923787&r1=923786&r2=923787&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/Makefile.am (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/Makefile.am Tue Mar 16 14:56:19 2010
@@ -23,6 +23,7 @@ cc_sources = \
     activemq/test/ExpirationTest.cpp \
     activemq/test/JmsMessageGroupsTest.cpp \
     activemq/test/MessageCompressionTest.cpp \
+    activemq/test/QueueBrowserTest.cpp \
     activemq/test/SimpleRollbackTest.cpp \
     activemq/test/SimpleTest.cpp \
     activemq/test/SlowListenerTest.cpp \
@@ -36,6 +37,7 @@ cc_sources = \
     activemq/test/openwire/OpenwireIndividualAckTest.cpp \
     activemq/test/openwire/OpenwireJmsMessageGroupsTest.cpp \
     activemq/test/openwire/OpenwireMessageCompressionTest.cpp \
+    activemq/test/openwire/OpenwireQueueBrowserTest.cpp \
     activemq/test/openwire/OpenwireSimpleRollbackTest.cpp \
     activemq/test/openwire/OpenwireSimpleTest.cpp \
     activemq/test/openwire/OpenwireSlowListenerTest.cpp \
@@ -66,6 +68,7 @@ h_sources = \
     activemq/test/ExpirationTest.h \
     activemq/test/JmsMessageGroupsTest.h \
     activemq/test/MessageCompressionTest.h \
+    activemq/test/QueueBrowserTest.h \
     activemq/test/SimpleRollbackTest.h \
     activemq/test/SimpleTest.h \
     activemq/test/SlowListenerTest.h \
@@ -79,6 +82,7 @@ h_sources = \
     activemq/test/openwire/OpenwireIndividualAckTest.h \
     activemq/test/openwire/OpenwireJmsMessageGroupsTest.h \
     activemq/test/openwire/OpenwireMessageCompressionTest.h \
+    activemq/test/openwire/OpenwireQueueBrowserTest.h \
     activemq/test/openwire/OpenwireSimpleRollbackTest.h \
     activemq/test/openwire/OpenwireSimpleTest.h \
     activemq/test/openwire/OpenwireSlowListenerTest.h \

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/TestRegistry.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/TestRegistry.cpp?rev=923787&r1=923786&r2=923787&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/TestRegistry.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/TestRegistry.cpp Tue Mar 16 14:56:19 2010
@@ -22,6 +22,7 @@
 #include "activemq/test/openwire/OpenwireExpirationTest.h"
 #include "activemq/test/openwire/OpenwireIndividualAckTest.h"
 #include "activemq/test/openwire/OpenwireMessageCompressionTest.h"
+#include "activemq/test/openwire/OpenwireQueueBrowserTest.h"
 #include "activemq/test/openwire/OpenwireSimpleRollbackTest.h"
 #include "activemq/test/openwire/OpenwireSimpleTest.h"
 #include "activemq/test/openwire/OpenwireTransactionTest.h"
@@ -48,6 +49,7 @@ CPPUNIT_TEST_SUITE_REGISTRATION( activem
 CPPUNIT_TEST_SUITE_REGISTRATION( activemq::test::openwire::OpenwireExpirationTest );
 CPPUNIT_TEST_SUITE_REGISTRATION( activemq::test::openwire::OpenwireIndividualAckTest );
 CPPUNIT_TEST_SUITE_REGISTRATION( activemq::test::openwire::OpenwireMessageCompressionTest );
+CPPUNIT_TEST_SUITE_REGISTRATION( activemq::test::openwire::OpenwireQueueBrowserTest );
 CPPUNIT_TEST_SUITE_REGISTRATION( activemq::test::openwire::OpenwireSimpleRollbackTest );
 CPPUNIT_TEST_SUITE_REGISTRATION( activemq::test::openwire::OpenwireSimpleTest );
 CPPUNIT_TEST_SUITE_REGISTRATION( activemq::test::openwire::OpenwireTransactionTest );



Mime
View raw message