activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r985395 [1/2] - in /activemq/activemq-cpp/trunk/activemq-cpp/src: main/ main/activemq/core/ main/cms/ test-integration/ test-integration/activemq/test/ test-integration/activemq/test/openwire/ test/ test/activemq/core/
Date Fri, 13 Aug 2010 22:53:53 GMT
Author: tabish
Date: Fri Aug 13 22:53:52 2010
New Revision: 985395

URL: http://svn.apache.org/viewvc?rev=985395&view=rev
Log:
fix for: https://issues.apache.org/activemq/browse/AMQCPP-310

Added:
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/FifoMessageDispatchChannel.cpp   (with props)
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/FifoMessageDispatchChannel.h   (with props)
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/SimplePriorityMessageDispatchChannel.cpp   (with props)
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/SimplePriorityMessageDispatchChannel.h   (with props)
    activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/MessagePriorityTest.cpp   (with props)
    activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/MessagePriorityTest.h   (with props)
    activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireMessagePriorityTest.cpp   (with props)
    activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireMessagePriorityTest.h   (with props)
    activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/FifoMessageDispatchChannelTest.cpp
      - copied, changed from r979266, activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/MessageDispatchChannelTest.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/FifoMessageDispatchChannelTest.h
      - copied, changed from r979266, activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/MessageDispatchChannelTest.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/SimplePriorityMessageDispatchChannelTest.cpp   (with props)
    activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/SimplePriorityMessageDispatchChannelTest.h   (with props)
Removed:
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/MessageDispatchChannel.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/MessageDispatchChannelTest.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/MessageDispatchChannelTest.h
Modified:
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/Makefile.am
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnectionFactory.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnectionFactory.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQProducer.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSessionExecutor.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSessionExecutor.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/MessageDispatchChannel.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/cms/Message.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/cms/Session.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/Makefile.am
    activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/TestRegistry.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireQueueBrowserTest.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/test/Makefile.am
    activemq/activemq-cpp/trunk/activemq-cpp/src/test/testRegistry.cpp

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/Makefile.am
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/Makefile.am?rev=985395&r1=985394&r2=985395&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/Makefile.am (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/Makefile.am Fri Aug 13 22:53:52 2010
@@ -94,9 +94,10 @@ cc_sources = \
     activemq/core/ActiveMQSession.cpp \
     activemq/core/ActiveMQSessionExecutor.cpp \
     activemq/core/ActiveMQTransactionContext.cpp \
-    activemq/core/MessageDispatchChannel.cpp \
+    activemq/core/FifoMessageDispatchChannel.cpp \
     activemq/core/PrefetchPolicy.cpp \
     activemq/core/RedeliveryPolicy.cpp \
+    activemq/core/SimplePriorityMessageDispatchChannel.cpp \
     activemq/core/policies/DefaultPrefetchPolicy.cpp \
     activemq/core/policies/DefaultRedeliveryPolicy.cpp \
     activemq/exceptions/ActiveMQException.cpp \
@@ -230,6 +231,7 @@ cc_sources = \
     cms/InvalidClientIdException.cpp \
     cms/InvalidDestinationException.cpp \
     cms/InvalidSelectorException.cpp \
+    cms/Message.cpp \
     cms/MessageEOFException.cpp \
     cms/MessageFormatException.cpp \
     cms/MessageNotReadableException.cpp \
@@ -492,9 +494,11 @@ h_sources = \
     activemq/core/ActiveMQTransactionContext.h \
     activemq/core/DispatchData.h \
     activemq/core/Dispatcher.h \
+    activemq/core/FifoMessageDispatchChannel.h \
     activemq/core/MessageDispatchChannel.h \
     activemq/core/PrefetchPolicy.h \
     activemq/core/RedeliveryPolicy.h \
+    activemq/core/SimplePriorityMessageDispatchChannel.h \
     activemq/core/Synchronization.h \
     activemq/core/policies/DefaultPrefetchPolicy.h \
     activemq/core/policies/DefaultRedeliveryPolicy.h \

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp?rev=985395&r1=985394&r2=985395&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp Fri Aug 13 22:53:52 2010
@@ -104,6 +104,7 @@ namespace core{
         bool dispatchAsync;
         bool alwaysSyncSend;
         bool useAsyncSend;
+        bool messagePrioritySupported;
         bool useCompression;
         int compressionLevel;
         unsigned int sendTimeout;
@@ -127,6 +128,7 @@ namespace core{
                              dispatchAsync( true ),
                              alwaysSyncSend( false ),
                              useAsyncSend( false ),
+                             messagePrioritySupported( true ),
                              useCompression( false ),
                              compressionLevel( -1 ),
                              sendTimeout( 0 ),
@@ -1073,3 +1075,13 @@ long long ActiveMQConnection::getNextLoc
 transport::Transport& ActiveMQConnection::getTransport() const {
     return *( this->config->transport );
 }
+
+////////////////////////////////////////////////////////////////////////////////
+bool ActiveMQConnection::isMessagePrioritySupported() const {
+    return this->config->messagePrioritySupported;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConnection::setMessagePrioritySupported( bool value ) {
+    this->config->messagePrioritySupported = value;
+}

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h?rev=985395&r1=985394&r2=985395&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h Fri Aug 13 22:53:52 2010
@@ -509,6 +509,21 @@ namespace core{
         void setProducerWindowSize( unsigned int windowSize );
 
         /**
+         * @returns true if the Connections that this factory creates should support the
+         * message based priority settings.
+         */
+        bool isMessagePrioritySupported() const;
+
+        /**
+         * Set whether or not this factory should create Connection objects with the Message
+         * priority support function enabled.
+         *
+         * @param value
+         *      Boolean indicating if Message priority should be enabled.
+         */
+        void setMessagePrioritySupported( bool value );
+
+        /**
          * Get the Next available Session Id.
          * @return the next id in the sequence.
          */

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnectionFactory.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnectionFactory.cpp?rev=985395&r1=985394&r2=985395&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnectionFactory.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnectionFactory.cpp Fri Aug 13 22:53:52 2010
@@ -65,6 +65,7 @@ namespace core{
         bool dispatchAsync;
         bool alwaysSyncSend;
         bool useAsyncSend;
+        bool messagePrioritySupported;
         bool useCompression;
         int compressionLevel;
         unsigned int sendTimeout;
@@ -79,6 +80,7 @@ namespace core{
                             dispatchAsync( true ),
                             alwaysSyncSend( false ),
                             useAsyncSend( false ),
+                            messagePrioritySupported( true ),
                             useCompression( false ),
                             compressionLevel( -1 ),
                             sendTimeout( 0 ),
@@ -115,15 +117,18 @@ namespace core{
                     core::ActiveMQConstants::toString(
                         core::ActiveMQConstants::CONNECTION_USECOMPRESSION ), "false" ) );
 
-            this->compressionLevel = decaf::lang::Integer::parseInt(
+            this->compressionLevel = Integer::parseInt(
                 properties->getProperty( "connection.compressionLevel", "-1" ) );
 
+            this->messagePrioritySupported = Boolean::parseBoolean(
+                properties->getProperty( "connection.messagePrioritySupported", "true" ) );
+
             this->dispatchAsync = Boolean::parseBoolean(
                 properties->getProperty(
                     core::ActiveMQConstants::toString(
                         core::ActiveMQConstants::CONNECTION_DISPATCHASYNC ), "true" ) );
 
-            this->producerWindowSize = decaf::lang::Integer::parseInt(
+            this->producerWindowSize = Integer::parseInt(
                 properties->getProperty(
                     core::ActiveMQConstants::toString(
                         core::ActiveMQConstants::CONNECTION_PRODUCERWINDOWSIZE ), "0" ) );
@@ -325,6 +330,7 @@ void ActiveMQConnectionFactory::configur
     connection->setProducerWindowSize( this->settings->producerWindowSize );
     connection->setPrefetchPolicy( this->settings->defaultPrefetchPolicy->clone() );
     connection->setRedeliveryPolicy( this->settings->defaultRedeliveryPolicy->clone() );
+    connection->setMessagePrioritySupported( this->settings->messagePrioritySupported );
 
     if( this->settings->defaultListener ) {
         connection->setExceptionListener( this->settings->defaultListener );
@@ -492,3 +498,13 @@ unsigned int ActiveMQConnectionFactory::
 void ActiveMQConnectionFactory::setProducerWindowSize( unsigned int windowSize ) {
     this->settings->producerWindowSize = windowSize;
 }
+
+////////////////////////////////////////////////////////////////////////////////
+bool ActiveMQConnectionFactory::isMessagePrioritySupported() const {
+    return this->settings->messagePrioritySupported;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConnectionFactory::setMessagePrioritySupported( bool value ) {
+    this->settings->messagePrioritySupported = value;
+}

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnectionFactory.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnectionFactory.h?rev=985395&r1=985394&r2=985395&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnectionFactory.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnectionFactory.h Fri Aug 13 22:53:52 2010
@@ -344,6 +344,21 @@ namespace core{
          */
         void setProducerWindowSize( unsigned int windowSize );
 
+        /**
+         * @returns true if the Connections that this factory creates should support the
+         * message based priority settings.
+         */
+        bool isMessagePrioritySupported() const;
+
+        /**
+         * Set whether or not this factory should create Connection objects with the Message
+         * priority support function enabled.
+         *
+         * @param value
+         *      Boolean indicating if Message priority should be enabled.
+         */
+        void setMessagePrioritySupported( bool value );
+
     public:
 
         /**

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp?rev=985395&r1=985394&r2=985395&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp Fri Aug 13 22:53:52 2010
@@ -39,6 +39,9 @@
 #include <activemq/core/ActiveMQSession.h>
 #include <activemq/core/ActiveMQTransactionContext.h>
 #include <activemq/core/ActiveMQAckHandler.h>
+#include <activemq/core/FifoMessageDispatchChannel.h>
+#include <activemq/core/SimplePriorityMessageDispatchChannel.h>
+#include <activemq/core/RedeliveryPolicy.h>
 #include <cms/ExceptionListener.h>
 #include <memory>
 
@@ -245,6 +248,12 @@ ActiveMQConsumer::ActiveMQConsumer( Acti
     this->redeliveryDelay = 0;
     this->redeliveryPolicy.reset( this->session->getConnection()->getRedeliveryPolicy()->clone() );
 
+    if( this->session->getConnection()->isMessagePrioritySupported() ) {
+        this->unconsumedMessages.reset( new SimplePriorityMessageDispatchChannel() );
+    } else {
+        this->unconsumedMessages.reset( new FifoMessageDispatchChannel() );
+    }
+
     if( listener != NULL ) {
         this->setMessageListener( listener );
     }
@@ -263,19 +272,24 @@ ActiveMQConsumer::~ActiveMQConsumer() th
 ////////////////////////////////////////////////////////////////////////////////
 void ActiveMQConsumer::start() {
 
-    if( this->unconsumedMessages.isClosed() ) {
+    if( this->unconsumedMessages->isClosed() ) {
         return;
     }
 
     this->started.set( true );
-    this->unconsumedMessages.start();
+    this->unconsumedMessages->start();
     this->session->wakeup();
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 void ActiveMQConsumer::stop() {
     this->started.set( false );
-    this->unconsumedMessages.stop();
+    this->unconsumedMessages->stop();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool ActiveMQConsumer::isClosed() const {
+    return this->unconsumedMessages->isClosed();
 }
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -327,7 +341,7 @@ void ActiveMQConsumer::doClose() {
 
             // Purge all the pending messages
             try{
-                unconsumedMessages.clear();
+                unconsumedMessages->clear();
             } catch ( ActiveMQException& ex ){
                 if( !haveException ){
                     ex.setMark( __FILE__, __LINE__ );
@@ -337,7 +351,7 @@ void ActiveMQConsumer::doClose() {
             }
 
             // Stop and Wakeup all sync consumers.
-            unconsumedMessages.close();
+            unconsumedMessages->close();
 
             if( this->session->isIndividualAcknowledge() ) {
                 // For IndividualAck Mode we need to unlink the ack handler to remove a
@@ -401,10 +415,10 @@ decaf::lang::Pointer<MessageDispatch> Ac
         // Loop until the time is up or we get a non-expired message
         while( true ) {
 
-            Pointer<MessageDispatch> dispatch = unconsumedMessages.dequeue( timeout );
+            Pointer<MessageDispatch> dispatch = unconsumedMessages->dequeue( timeout );
             if( dispatch == NULL ) {
 
-                if( timeout > 0 && !unconsumedMessages.isClosed() ) {
+                if( timeout > 0 && !unconsumedMessages->isClosed() ) {
                     timeout = Math::max( deadline - System::currentTimeMillis(), 0LL );
                 } else {
                     return Pointer<MessageDispatch>();
@@ -560,7 +574,7 @@ void ActiveMQConsumer::setMessageListene
                 this->listener = listener;
             }
 
-            session->redispatch( unconsumedMessages );
+            session->redispatch( *unconsumedMessages );
 
             if( wasStarted ) {
                 session->start();
@@ -609,7 +623,7 @@ void ActiveMQConsumer::afterMessageIsCon
 
     try{
 
-        if( unconsumedMessages.isClosed() ) {
+        if( unconsumedMessages->isClosed() ) {
             return;
         }
 
@@ -872,7 +886,7 @@ void ActiveMQConsumer::commit() {
 ////////////////////////////////////////////////////////////////////////////////
 void ActiveMQConsumer::rollback() {
 
-    synchronized( &unconsumedMessages ) {
+    synchronized( unconsumedMessages.get() ) {
 
         synchronized( &dispatchedMessages ) {
             if( dispatchedMessages.empty() ) {
@@ -933,15 +947,15 @@ void ActiveMQConsumer::rollback() {
                 }
 
                 // stop the delivery of messages.
-                unconsumedMessages.stop();
+                unconsumedMessages->stop();
 
                 std::auto_ptr< Iterator< Pointer<MessageDispatch> > > iter( dispatchedMessages.iterator() );
 
                 while( iter->hasNext() ) {
-                    unconsumedMessages.enqueueFirst( iter->next() );
+                    unconsumedMessages->enqueueFirst( iter->next() );
                 }
 
-                if (redeliveryDelay > 0 && !unconsumedMessages.isClosed()) {
+                if (redeliveryDelay > 0 && !unconsumedMessages->isClosed()) {
                     // TODO
                     // Start up the delivery again a little later.
                     //scheduler.executeAfterDelay(new Runnable() {
@@ -967,7 +981,7 @@ void ActiveMQConsumer::rollback() {
     }
 
     if( this->listener != NULL ) {
-        session->redispatch( unconsumedMessages );
+        session->redispatch( *unconsumedMessages );
     }
 }
 
@@ -976,17 +990,17 @@ void ActiveMQConsumer::dispatch( const P
 
     try {
 
-        synchronized( &unconsumedMessages ) {
+        synchronized( unconsumedMessages.get() ) {
 
             clearMessagesInProgress();
             if( this->clearDispatchList ) {
                 // we are reconnecting so lets flush the in progress
                 // messages
                 clearDispatchList = false;
-                unconsumedMessages.clear();
+                unconsumedMessages->clear();
             }
 
-            if( !unconsumedMessages.isClosed() ) {
+            if( !unconsumedMessages->isClosed() ) {
 
                 // Don't dispatch expired messages, ack it and then destroy it
                 if( dispatch->getMessage()->isExpired() ) {
@@ -998,7 +1012,7 @@ void ActiveMQConsumer::dispatch( const P
 
                 synchronized( &listenerMutex ) {
                     // If we have a listener, send the message.
-                    if( this->listener != NULL && unconsumedMessages.isRunning() ) {
+                    if( this->listener != NULL && unconsumedMessages->isRunning() ) {
 
                         // Preprocessing.
                         beforeMessageIsConsumed( dispatch );
@@ -1013,7 +1027,7 @@ void ActiveMQConsumer::dispatch( const P
                     } else {
 
                         // No listener, add it to the unconsumed messages list
-                        this->unconsumedMessages.enqueue( dispatch );
+                        this->unconsumedMessages->enqueue( dispatch );
                     }
                 }
             }
@@ -1032,7 +1046,7 @@ void ActiveMQConsumer::sendPullRequest( 
         this->checkClosed();
 
         // There are still local message, consume them first.
-        if( !this->unconsumedMessages.isEmpty() ) {
+        if( !this->unconsumedMessages->isEmpty() ) {
             return;
         }
 
@@ -1067,7 +1081,7 @@ bool ActiveMQConsumer::iterate() {
 
         if( this->listener != NULL ) {
 
-            Pointer<MessageDispatch> dispatch = unconsumedMessages.dequeueNoWait();
+            Pointer<MessageDispatch> dispatch = unconsumedMessages->dequeueNoWait();
             if( dispatch != NULL ) {
 
                 try {
@@ -1098,7 +1112,7 @@ void ActiveMQConsumer::inProgressClearRe
 ////////////////////////////////////////////////////////////////////////////////
 void ActiveMQConsumer::clearMessagesInProgress() {
     if( inProgressClearRequiredFlag ) {
-        synchronized( &unconsumedMessages ) {
+        synchronized( unconsumedMessages.get() ) {
             if( inProgressClearRequiredFlag ) {
 
                 // TODO - Rollback duplicates.
@@ -1124,7 +1138,7 @@ bool ActiveMQConsumer::isAutoAcknowledge
 
 ////////////////////////////////////////////////////////////////////////////////
 int ActiveMQConsumer::getMessageAvailableCount() const {
-    return this->unconsumedMessages.size();
+    return this->unconsumedMessages->size();
 }
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -1201,3 +1215,15 @@ void ActiveMQConsumer::applyDestinationO
                 options.getProperty( networkSubscriptionStr ) ) );
     }
 }
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConsumer::setRedeliveryPolicy( RedeliveryPolicy* policy ) {
+    if( policy != NULL ) {
+        this->redeliveryPolicy.reset( policy );
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+RedeliveryPolicy* ActiveMQConsumer::getRedeliveryPolicy() const {
+    return this->redeliveryPolicy.get();
+}

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.h?rev=985395&r1=985394&r2=985395&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.h Fri Aug 13 22:53:52 2010
@@ -28,21 +28,19 @@
 #include <activemq/commands/MessageAck.h>
 #include <activemq/commands/MessageDispatch.h>
 #include <activemq/core/Dispatcher.h>
-#include <activemq/core/MessageDispatchChannel.h>
 #include <activemq/core/RedeliveryPolicy.h>
+#include <activemq/core/MessageDispatchChannel.h>
 
 #include <decaf/util/concurrent/atomic/AtomicBoolean.h>
 #include <decaf/lang/Pointer.h>
 #include <decaf/util/StlQueue.h>
 #include <decaf/util/concurrent/Mutex.h>
-#include <memory>
 
 namespace activemq{
 namespace core{
 
     using decaf::lang::Pointer;
     using decaf::util::concurrent::atomic::AtomicBoolean;
-    using activemq::core::MessageDispatchChannel;
 
     class ActiveMQSession;
 
@@ -84,7 +82,7 @@ namespace core{
         /**
          * Queue of unconsumed messages.
          */
-        MessageDispatchChannel unconsumedMessages;
+        Pointer<MessageDispatchChannel> unconsumedMessages;
 
         /**
          * Queue of consumed messages.
@@ -134,7 +132,7 @@ namespace core{
         /**
          * The policy to use when Message Redelivery is in progress.
          */
-        std::auto_ptr<RedeliveryPolicy> redeliveryPolicy;
+        Pointer<RedeliveryPolicy> redeliveryPolicy;
 
     private:
 
@@ -239,9 +237,7 @@ namespace core{
         /**
          * @returns if this Consumer has been closed.
          */
-        bool isClosed() const {
-            return this->unconsumedMessages.isClosed();
-        }
+        bool isClosed() const;
 
         /**
          * Has this Consumer Transaction Synchronization been added to the transaction
@@ -317,11 +313,7 @@ namespace core{
          * @param policy
          *      Pointer to a Redelivery Policy object that his Consumer will use.
          */
-        void setRedeliveryPolicy( RedeliveryPolicy* policy ) {
-            if( policy != NULL ) {
-                this->redeliveryPolicy.reset( policy );
-            }
-        }
+        void setRedeliveryPolicy( RedeliveryPolicy* policy );
 
         /**
          * Gets a pointer to this Consumer's Redelivery Policy object, the Consumer
@@ -329,9 +321,7 @@ namespace core{
          *
          * @returns a Pointer to a RedeliveryPolicy that is in use by this Consumer.
          */
-        RedeliveryPolicy* getRedeliveryPolicy() const {
-            return this->redeliveryPolicy.get();
-        }
+        RedeliveryPolicy* getRedeliveryPolicy() const;
 
     protected:
 

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQProducer.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQProducer.cpp?rev=985395&r1=985394&r2=985395&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQProducer.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQProducer.cpp Fri Aug 13 22:53:52 2010
@@ -16,6 +16,7 @@
  */
 #include "ActiveMQProducer.h"
 
+#include <cms/Message.h>
 #include <activemq/core/ActiveMQSession.h>
 #include <activemq/core/ActiveMQConnection.h>
 #include <activemq/commands/RemoveInfo.h>
@@ -73,11 +74,11 @@ ActiveMQProducer::ActiveMQProducer( Acti
     this->closed = false;
 
     // Default the Delivery options
-    this->defaultDeliveryMode = cms::DeliveryMode::PERSISTENT;
+    this->defaultDeliveryMode = cms::Message::DEFAULT_DELIVERY_MODE;
     this->disableTimestamps = false;
     this->disableMessageId = false;
-    this->defaultPriority = 4;
-    this->defaultTimeToLive = 0;
+    this->defaultPriority = cms::Message::DEFAULT_MSG_PRIORITY;
+    this->defaultTimeToLive = cms::Message::DEFAULT_TIME_TO_LIVE;
     this->sendTimeout = sendTimeout;
 }
 

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSessionExecutor.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSessionExecutor.cpp?rev=985395&r1=985394&r2=985395&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSessionExecutor.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSessionExecutor.cpp Fri Aug 13 22:53:52 2010
@@ -16,9 +16,12 @@
  */
 
 #include "ActiveMQSessionExecutor.h"
-#include "ActiveMQSession.h"
-#include "ActiveMQConsumer.h"
 
+#include <activemq/core/ActiveMQConnection.h>
+#include <activemq/core/ActiveMQConsumer.h>
+#include <activemq/core/ActiveMQSession.h>
+#include <activemq/core/FifoMessageDispatchChannel.h>
+#include <activemq/core/SimplePriorityMessageDispatchChannel.h>
 #include <activemq/commands/ConsumerInfo.h>
 #include <activemq/threads/DedicatedTaskRunner.h>
 
@@ -35,6 +38,12 @@ using namespace decaf::util::concurrent;
 ActiveMQSessionExecutor::ActiveMQSessionExecutor( ActiveMQSession* session ) {
 
     this->session = session;
+
+    if( this->session->getConnection()->isMessagePrioritySupported() ) {
+        this->messageQueue.reset( new SimplePriorityMessageDispatchChannel() );
+    } else {
+        this->messageQueue.reset( new FifoMessageDispatchChannel() );
+    }
 }
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -58,7 +67,7 @@ ActiveMQSessionExecutor::~ActiveMQSessio
 void ActiveMQSessionExecutor::execute( const Pointer<MessageDispatch>& dispatch ) {
 
     // Add the data to the queue.
-    this->messageQueue.enqueue( dispatch );
+    this->messageQueue->enqueue( dispatch );
     this->wakeup();
 }
 
@@ -66,7 +75,7 @@ void ActiveMQSessionExecutor::execute( c
 void ActiveMQSessionExecutor::executeFirst( const Pointer<MessageDispatch>& dispatch ) {
 
     // Add the data to the queue.
-    this->messageQueue.enqueueFirst( dispatch );
+    this->messageQueue->enqueueFirst( dispatch );
     this->wakeup();
 }
 
@@ -74,7 +83,7 @@ void ActiveMQSessionExecutor::executeFir
 void ActiveMQSessionExecutor::wakeup() {
 
     Pointer<TaskRunner> taskRunner = this->taskRunner;
-    synchronized( &messageQueue ) {
+    synchronized( messageQueue.get() ) {
         if( this->taskRunner == NULL ) {
             this->taskRunner.reset( new DedicatedTaskRunner( this ) );
         }
@@ -88,9 +97,9 @@ void ActiveMQSessionExecutor::wakeup() {
 ////////////////////////////////////////////////////////////////////////////////
 void ActiveMQSessionExecutor::start() {
 
-    if( !messageQueue.isRunning() ) {
+    if( !messageQueue->isRunning() ) {
 
-        messageQueue.start();
+        messageQueue->start();
         if( hasUncomsumedMessages() ) {
             this->wakeup();
         }
@@ -100,8 +109,8 @@ void ActiveMQSessionExecutor::start() {
 ////////////////////////////////////////////////////////////////////////////////
 void ActiveMQSessionExecutor::stop() {
 
-    if( messageQueue.isRunning() ) {
-        messageQueue.stop();
+    if( messageQueue->isRunning() ) {
+        messageQueue->stop();
         Pointer<TaskRunner> taskRunner = this->taskRunner;
         if( taskRunner != NULL ) {
             this->taskRunner.reset( NULL );
@@ -161,10 +170,10 @@ bool ActiveMQSessionExecutor::iterate() 
 
         // No messages left queued on the listeners.. so now dispatch messages
         // queued on the session
-        Pointer<MessageDispatch> message = messageQueue.dequeueNoWait();
+        Pointer<MessageDispatch> message = messageQueue->dequeueNoWait();
         if( message != NULL ) {
             dispatch( message );
-            return !messageQueue.isEmpty();
+            return !messageQueue->isEmpty();
         }
 
         return false;

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSessionExecutor.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSessionExecutor.h?rev=985395&r1=985394&r2=985395&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSessionExecutor.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSessionExecutor.h Fri Aug 13 22:53:52 2010
@@ -46,7 +46,7 @@ namespace core{
         ActiveMQSession* session;
 
         /** The Channel that holds the waiting Messages for Dispatching. */
-        MessageDispatchChannel messageQueue;
+        Pointer<MessageDispatchChannel> messageQueue;
 
         /** The Dispatcher TaskRunner */
         Pointer<activemq::threads::TaskRunner> taskRunner;
@@ -86,14 +86,14 @@ namespace core{
          * Removes all messages in the Dispatch Channel so that non are delivered.
          */
         virtual void clearMessagesInProgress() {
-            this->messageQueue.clear();
+            this->messageQueue->clear();
         }
 
         /**
          * @return true if there are any pending messages in the dispatch channel.
          */
         virtual bool hasUncomsumedMessages() const {
-            return !messageQueue.isClosed() && messageQueue.isRunning() && !messageQueue.isEmpty();
+            return !messageQueue->isClosed() && messageQueue->isRunning() && !messageQueue->isEmpty();
         }
 
         /**
@@ -116,28 +116,28 @@ namespace core{
          * usable.
          */
         virtual void close() {
-            this->messageQueue.close();
+            this->messageQueue->close();
         }
 
         /**
          * @return true indicates if the executor is started
          */
         virtual bool isRunning() const {
-            return this->messageQueue.isRunning();
+            return this->messageQueue->isRunning();
         }
 
         /**
          * @return true if there are no messages in the Dispatch Channel.
          */
         virtual bool isEmpty() {
-            return messageQueue.isEmpty();
+            return messageQueue->isEmpty();
         }
 
         /**
          * Removes all queued messages and destroys them.
          */
         virtual void clear() {
-            this->messageQueue.clear();
+            this->messageQueue->clear();
         }
 
         /**
@@ -153,7 +153,7 @@ namespace core{
          *          Message Dispatch Channel when called.
          */
         std::vector< Pointer<MessageDispatch> > getUnconsumedMessages() {
-            return messageQueue.removeAll();
+            return messageQueue->removeAll();
         }
 
     private:

Added: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/FifoMessageDispatchChannel.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/FifoMessageDispatchChannel.cpp?rev=985395&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/FifoMessageDispatchChannel.cpp (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/FifoMessageDispatchChannel.cpp Fri Aug 13 22:53:52 2010
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "FifoMessageDispatchChannel.h"
+
+using namespace std;
+using namespace activemq;
+using namespace activemq::core;
+using namespace activemq::commands;
+using namespace activemq::exceptions;
+using namespace decaf;
+using namespace decaf::lang;
+using namespace decaf::util;
+using namespace decaf::util::concurrent;
+
+////////////////////////////////////////////////////////////////////////////////
+FifoMessageDispatchChannel::FifoMessageDispatchChannel() : closed( false ), running( false ), channel() {
+}
+
+////////////////////////////////////////////////////////////////////////////////
+FifoMessageDispatchChannel::~FifoMessageDispatchChannel() {
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void FifoMessageDispatchChannel::enqueue( const Pointer<MessageDispatch>& message ) {
+    synchronized( &channel ) {
+        channel.push( message );
+        channel.notify();
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void FifoMessageDispatchChannel::enqueueFirst( const Pointer<MessageDispatch>& message ) {
+    synchronized( &channel ) {
+        channel.enqueueFront( message );
+        channel.notify();
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool FifoMessageDispatchChannel::isEmpty() const {
+    synchronized( &channel ) {
+        return channel.empty();
+    }
+
+    return false;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+Pointer<MessageDispatch> FifoMessageDispatchChannel::dequeue( long long timeout ) {
+
+    synchronized( &channel ) {
+        // Wait until the channel is ready to deliver messages.
+        while( timeout != 0 && !closed && ( channel.empty() || !running ) ) {
+            if( timeout == -1 ) {
+                channel.wait();
+            } else {
+                channel.wait( (unsigned long)timeout );
+                break;
+            }
+        }
+
+        if( closed || !running || channel.empty() ) {
+            return Pointer<MessageDispatch>();
+        }
+
+        return channel.pop();
+    }
+
+    return Pointer<MessageDispatch>();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+Pointer<MessageDispatch> FifoMessageDispatchChannel::dequeueNoWait() {
+    synchronized( &channel ) {
+        if( closed || !running || channel.empty() ) {
+            return Pointer<MessageDispatch>();
+        }
+        return channel.pop();
+    }
+
+    return Pointer<MessageDispatch>();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+Pointer<MessageDispatch> FifoMessageDispatchChannel::peek() const {
+    synchronized( &channel ) {
+        if( closed || !running || channel.empty() ) {
+            return Pointer<MessageDispatch>();
+        }
+        return channel.front();
+    }
+
+    return Pointer<MessageDispatch>();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void FifoMessageDispatchChannel::start() {
+    synchronized( &channel ) {
+        if( !closed ) {
+            running = true;
+            channel.notifyAll();
+        }
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void FifoMessageDispatchChannel::stop() {
+    synchronized( &channel ) {
+        running = false;
+        channel.notifyAll();
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void FifoMessageDispatchChannel::close() {
+    synchronized( &channel ) {
+        if( !closed ) {
+            running = false;
+            closed = true;
+        }
+        channel.notifyAll();
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void FifoMessageDispatchChannel::clear() {
+    synchronized( &channel ) {
+        channel.clear();
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+int FifoMessageDispatchChannel::size() const {
+    synchronized( &channel ) {
+        return (int)channel.size();
+    }
+
+    return 0;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+std::vector< Pointer<MessageDispatch> > FifoMessageDispatchChannel::removeAll() {
+    std::vector< Pointer<MessageDispatch> > result;
+
+    synchronized( &channel ) {
+        result = channel.toArray();
+        channel.clear();
+    }
+
+    return result;
+}

Propchange: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/FifoMessageDispatchChannel.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/FifoMessageDispatchChannel.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/FifoMessageDispatchChannel.h?rev=985395&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/FifoMessageDispatchChannel.h (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/FifoMessageDispatchChannel.h Fri Aug 13 22:53:52 2010
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _ACTIVEMQ_CORE_FIFOMESSAGEDISPATCHCHANNEL_H_
+#define _ACTIVEMQ_CORE_FIFOMESSAGEDISPATCHCHANNEL_H_
+
+#include <activemq/util/Config.h>
+#include <activemq/core/MessageDispatchChannel.h>
+
+#include <decaf/lang/Pointer.h>
+
+namespace activemq {
+namespace core {
+
+    class AMQCPP_API FifoMessageDispatchChannel : public MessageDispatchChannel {
+    private:
+
+        bool closed;
+        bool running;
+
+        mutable decaf::util::StlQueue< Pointer<MessageDispatch> > channel;
+
+    private:
+
+        FifoMessageDispatchChannel( const FifoMessageDispatchChannel& );
+        FifoMessageDispatchChannel& operator= ( const FifoMessageDispatchChannel& );
+
+    public:
+
+        FifoMessageDispatchChannel();
+
+        virtual ~FifoMessageDispatchChannel();
+
+        virtual void enqueue( const Pointer<MessageDispatch>& message );
+
+        virtual void enqueueFirst( const Pointer<MessageDispatch>& message );
+
+        virtual bool isEmpty() const;
+
+        virtual bool isClosed() const {
+            return this->closed;
+        }
+
+        virtual bool isRunning() const {
+            return this->running;
+        }
+
+        virtual Pointer<MessageDispatch> dequeue( long long timeout );
+
+        virtual Pointer<MessageDispatch> dequeueNoWait();
+
+        virtual Pointer<MessageDispatch> peek() const;
+
+        virtual void start();
+
+        virtual void stop();
+
+        virtual void close();
+
+        virtual void clear();
+
+        virtual int size() const;
+
+        virtual std::vector< Pointer<MessageDispatch> > removeAll();
+
+    public:
+
+        virtual void lock() throw( decaf::lang::exceptions::RuntimeException ) {
+            channel.lock();
+        }
+
+        virtual bool tryLock() throw( decaf::lang::exceptions::RuntimeException ) {
+            return channel.tryLock();
+        }
+
+        virtual void unlock() throw( decaf::lang::exceptions::RuntimeException ) {
+            channel.unlock();
+        }
+
+        virtual void wait() throw( decaf::lang::exceptions::RuntimeException,
+                                   decaf::lang::exceptions::IllegalMonitorStateException,
+                                   decaf::lang::exceptions::InterruptedException ) {
+
+            channel.wait();
+        }
+
+        virtual void wait( long long millisecs )
+            throw( decaf::lang::exceptions::RuntimeException,
+                   decaf::lang::exceptions::IllegalMonitorStateException,
+                   decaf::lang::exceptions::InterruptedException ) {
+
+            channel.wait( millisecs );
+        }
+
+        virtual void wait( long long millisecs, int nanos )
+            throw( decaf::lang::exceptions::RuntimeException,
+                   decaf::lang::exceptions::IllegalArgumentException,
+                   decaf::lang::exceptions::IllegalMonitorStateException,
+                   decaf::lang::exceptions::InterruptedException ) {
+
+            channel.wait( millisecs, nanos );
+        }
+
+        virtual void notify() throw( decaf::lang::exceptions::RuntimeException,
+                                     decaf::lang::exceptions::IllegalMonitorStateException ) {
+
+            channel.notify();
+        }
+
+        virtual void notifyAll() throw( decaf::lang::exceptions::RuntimeException,
+                                        decaf::lang::exceptions::IllegalMonitorStateException ) {
+
+            channel.notifyAll();
+        }
+
+    };
+
+}}
+
+#endif /* _ACTIVEMQ_CORE_FIFOMESSAGEDISPATCHCHANNEL_H_ */

Propchange: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/FifoMessageDispatchChannel.h
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/MessageDispatchChannel.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/MessageDispatchChannel.h?rev=985395&r1=985394&r2=985395&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/MessageDispatchChannel.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/MessageDispatchChannel.h Fri Aug 13 22:53:52 2010
@@ -33,56 +33,38 @@ namespace core {
     using activemq::commands::MessageDispatch;
 
     class AMQCPP_API MessageDispatchChannel : public decaf::util::concurrent::Synchronizable {
-    private:
-
-        bool closed;
-        bool running;
-
-        mutable decaf::util::StlQueue< Pointer<MessageDispatch> > channel;
-
-    private:
-
-        MessageDispatchChannel( const MessageDispatchChannel& );
-        MessageDispatchChannel& operator= ( const MessageDispatchChannel& );
-
     public:
 
-        MessageDispatchChannel();
-
-        virtual ~MessageDispatchChannel();
+        virtual ~MessageDispatchChannel() {}
 
         /**
          * Add a Message to the Channel behind all pending message.
          *
          * @param message - The message to add to the Channel.
          */
-        void enqueue( const Pointer<MessageDispatch>& message );
+        virtual void enqueue( const Pointer<MessageDispatch>& message ) = 0;
 
         /**
          * Add a message to the front of the Channel.
          *
          * @param message - The Message to add to the front of the Channel.
          */
-        void enqueueFirst( const Pointer<MessageDispatch>& message );
+        virtual void enqueueFirst( const Pointer<MessageDispatch>& message ) = 0;
 
         /**
          * @return true if there are no messages in the Channel.
          */
-        bool isEmpty() const;
+        virtual bool isEmpty() const = 0;
 
         /**
          * @return has the Queue been closed.
          */
-        bool isClosed() const {
-            return this->closed;
-        }
+        virtual bool isClosed() const = 0;
 
         /**
          * @return true if the Channel currently running and will dequeue message.
          */
-        bool isRunning() const {
-            return this->running;
-        }
+        virtual bool isRunning() const = 0;
 
         /**
          * Used to get an enqueued message. The amount of time this method blocks is
@@ -95,7 +77,7 @@ namespace core {
          * @return null if we timeout or if the consumer is closed.
          * @throws ActiveMQException
          */
-        Pointer<MessageDispatch> dequeue( long long timeout );
+        virtual Pointer<MessageDispatch> dequeue( long long timeout ) = 0;
 
         /**
          * Used to get an enqueued message if there is one queued right now.  If there is
@@ -103,7 +85,7 @@ namespace core {
          *
          * @return a message if there is one in the queue.
          */
-        Pointer<MessageDispatch> dequeueNoWait();
+        virtual Pointer<MessageDispatch> dequeueNoWait() = 0;
 
         /**
          * Peek in the Queue and return the first message in the Channel without removing
@@ -111,32 +93,32 @@ namespace core {
          *
          * @return a message if there is one in the queue.
          */
-        Pointer<MessageDispatch> peek() const;
+        virtual Pointer<MessageDispatch> peek() const = 0;
 
         /**
          * Starts dispatch of messages from the Channel.
          */
-        void start();
+        virtual void start() = 0;
 
         /**
          * Stops dispatch of message from the Channel.
          */
-        void stop();
+        virtual void stop() = 0;
 
         /**
          * Close this channel no messages will be dispatched after this method is called.
          */
-        void close();
+        virtual void close() = 0;
 
         /**
          * Clear the Channel, all pending messages are removed.
          */
-        void clear();
+        virtual void clear() = 0;
 
         /**
          * @return the number of Messages currently in the Channel.
          */
-        int size() const;
+        virtual int size() const = 0;
 
         /**
          * Remove all messages that are currently in the Channel and return them as
@@ -144,57 +126,7 @@ namespace core {
          *
          * @return a list of Messages that was previously in the Channel.
          */
-        std::vector< Pointer<MessageDispatch> > removeAll();
-
-    public:
-
-        virtual void lock() throw( decaf::lang::exceptions::RuntimeException ) {
-            channel.lock();
-        }
-
-        virtual bool tryLock() throw( decaf::lang::exceptions::RuntimeException ) {
-            return channel.tryLock();
-        }
-
-        virtual void unlock() throw( decaf::lang::exceptions::RuntimeException ) {
-            channel.unlock();
-        }
-
-        virtual void wait() throw( decaf::lang::exceptions::RuntimeException,
-                                   decaf::lang::exceptions::IllegalMonitorStateException,
-                                   decaf::lang::exceptions::InterruptedException ) {
-
-            channel.wait();
-        }
-
-        virtual void wait( long long millisecs )
-            throw( decaf::lang::exceptions::RuntimeException,
-                   decaf::lang::exceptions::IllegalMonitorStateException,
-                   decaf::lang::exceptions::InterruptedException ) {
-
-            channel.wait( millisecs );
-        }
-
-        virtual void wait( long long millisecs, int nanos )
-            throw( decaf::lang::exceptions::RuntimeException,
-                   decaf::lang::exceptions::IllegalArgumentException,
-                   decaf::lang::exceptions::IllegalMonitorStateException,
-                   decaf::lang::exceptions::InterruptedException ) {
-
-            channel.wait( millisecs, nanos );
-        }
-
-        virtual void notify() throw( decaf::lang::exceptions::RuntimeException,
-                                     decaf::lang::exceptions::IllegalMonitorStateException ) {
-
-            channel.notify();
-        }
-
-        virtual void notifyAll() throw( decaf::lang::exceptions::RuntimeException,
-                                        decaf::lang::exceptions::IllegalMonitorStateException ) {
-
-            channel.notifyAll();
-        }
+        virtual std::vector< Pointer<MessageDispatch> > removeAll() = 0;
 
     };
 

Added: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/SimplePriorityMessageDispatchChannel.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/SimplePriorityMessageDispatchChannel.cpp?rev=985395&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/SimplePriorityMessageDispatchChannel.cpp (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/SimplePriorityMessageDispatchChannel.cpp Fri Aug 13 22:53:52 2010
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "SimplePriorityMessageDispatchChannel.h"
+
+#include <cms/Message.h>
+
+#include <decaf/lang/Math.h>
+
+using namespace std;
+using namespace activemq;
+using namespace activemq::core;
+using namespace activemq::commands;
+using namespace activemq::exceptions;
+using namespace decaf;
+using namespace decaf::lang;
+using namespace decaf::util;
+using namespace decaf::util::concurrent;
+
+////////////////////////////////////////////////////////////////////////////////
+const int SimplePriorityMessageDispatchChannel::MAX_PRIORITIES = 10;
+
+////////////////////////////////////////////////////////////////////////////////
+SimplePriorityMessageDispatchChannel::SimplePriorityMessageDispatchChannel() :
+    closed( false ), running( false ), mutex(), channels( MAX_PRIORITIES ), enqueued( 0 ) {
+
+}
+
+////////////////////////////////////////////////////////////////////////////////
+SimplePriorityMessageDispatchChannel::~SimplePriorityMessageDispatchChannel() {
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void SimplePriorityMessageDispatchChannel::enqueue( const Pointer<MessageDispatch>& message ) {
+    synchronized( &mutex ) {
+        this->getChannel( message ).push( message );
+        this->enqueued++;
+        mutex.notify();
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void SimplePriorityMessageDispatchChannel::enqueueFirst( const Pointer<MessageDispatch>& message ) {
+    synchronized( &mutex ) {
+        this->getChannel( message ).enqueueFront( message );
+        this->enqueued++;
+        mutex.notify();
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool SimplePriorityMessageDispatchChannel::isEmpty() const {
+    return this->enqueued == 0;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+Pointer<MessageDispatch> SimplePriorityMessageDispatchChannel::dequeue( long long timeout ) {
+
+    synchronized( &mutex ) {
+        // Wait until the channel is ready to deliver messages.
+        while( timeout != 0 && !closed && ( isEmpty() || !running ) ) {
+            if( timeout == -1 ) {
+                mutex.wait();
+            } else {
+                mutex.wait( (unsigned long)timeout );
+                break;
+            }
+        }
+
+        if( closed || !running || isEmpty() ) {
+            return Pointer<MessageDispatch>();
+        }
+
+        return removeFirst();
+    }
+
+    return Pointer<MessageDispatch>();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+Pointer<MessageDispatch> SimplePriorityMessageDispatchChannel::dequeueNoWait() {
+    synchronized( &mutex ) {
+        if( closed || !running || isEmpty() ) {
+            return Pointer<MessageDispatch>();
+        }
+        return removeFirst();
+    }
+
+    return Pointer<MessageDispatch>();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+Pointer<MessageDispatch> SimplePriorityMessageDispatchChannel::peek() const {
+    synchronized( &mutex ) {
+        if( closed || !running || isEmpty() ) {
+            return Pointer<MessageDispatch>();
+        }
+        return getFirst();
+    }
+
+    return Pointer<MessageDispatch>();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void SimplePriorityMessageDispatchChannel::start() {
+    synchronized( &mutex ) {
+        if( !closed ) {
+            running = true;
+            mutex.notifyAll();
+        }
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void SimplePriorityMessageDispatchChannel::stop() {
+    synchronized( &mutex ) {
+        running = false;
+        mutex.notifyAll();
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void SimplePriorityMessageDispatchChannel::close() {
+    synchronized( &mutex ) {
+        if( !closed ) {
+            running = false;
+            closed = true;
+        }
+        mutex.notifyAll();
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void SimplePriorityMessageDispatchChannel::clear() {
+    synchronized( &mutex ) {
+        for( int i = 0; i < MAX_PRIORITIES; i++ ) {
+            this->channels[i].clear();
+        }
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+int SimplePriorityMessageDispatchChannel::size() const {
+    synchronized( &mutex ) {
+        return this->enqueued;
+    }
+
+    return 0;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+std::vector< Pointer<MessageDispatch> > SimplePriorityMessageDispatchChannel::removeAll() {
+    std::vector< Pointer<MessageDispatch> > result;
+
+    synchronized( &mutex ) {
+        for( int i = MAX_PRIORITIES - 1; i >= 0; --i ) {
+            std::vector< Pointer<MessageDispatch> > temp( channels[i].toArray() );
+            result.insert( result.end(), temp.begin(), temp.end() );
+            this->enqueued -= temp.size();
+            channels[i].clear();
+        }
+    }
+
+    return result;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+StlQueue< Pointer<MessageDispatch> >& SimplePriorityMessageDispatchChannel::getChannel( const Pointer<MessageDispatch>& dispatch ) {
+
+    int priority = cms::Message::DEFAULT_MSG_PRIORITY;
+
+    if( dispatch->getMessage() != NULL ) {
+        priority = Math::max( dispatch->getMessage()->getPriority(), 0 );
+        priority = Math::min( dispatch->getMessage()->getPriority(), 9 );
+    }
+
+    return this->channels[priority];
+}
+
+////////////////////////////////////////////////////////////////////////////////
+Pointer<MessageDispatch> SimplePriorityMessageDispatchChannel::removeFirst() {
+
+    if( this->enqueued > 0 ) {
+        for( int i = MAX_PRIORITIES - 1; i >= 0; i-- ) {
+            StlQueue< Pointer<MessageDispatch> >& channel = channels[i];
+            if( !channel.empty() ) {
+                this->enqueued--;
+                return channel.pop();
+            }
+        }
+    }
+
+    return Pointer<MessageDispatch>();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+Pointer<MessageDispatch> SimplePriorityMessageDispatchChannel::getFirst() const {
+
+    if( this->enqueued > 0 ) {
+        for( int i = MAX_PRIORITIES - 1; i >= 0; i-- ) {
+            StlQueue< Pointer<MessageDispatch> >& channel = channels[i];
+            if( !channel.empty() ) {
+                return channel.front();
+            }
+        }
+    }
+
+    return Pointer<MessageDispatch>();
+}

Propchange: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/SimplePriorityMessageDispatchChannel.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/SimplePriorityMessageDispatchChannel.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/SimplePriorityMessageDispatchChannel.h?rev=985395&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/SimplePriorityMessageDispatchChannel.h (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/SimplePriorityMessageDispatchChannel.h Fri Aug 13 22:53:52 2010
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _ACTIVEMQ_CORE_SIMPLEPRIORITYMESSAGEDISPATCHCHANNEL_H_
+#define _ACTIVEMQ_CORE_SIMPLEPRIORITYMESSAGEDISPATCHCHANNEL_H_
+
+#include <activemq/util/Config.h>
+#include <activemq/core/MessageDispatchChannel.h>
+
+#include <decaf/util/StlQueue.h>
+#include <decaf/lang/ArrayPointer.h>
+#include <decaf/util/concurrent/Mutex.h>
+
+namespace activemq {
+namespace core {
+
+    using decaf::lang::ArrayPointer;
+
+    class SimplePriorityMessageDispatchChannel : public MessageDispatchChannel {
+    private:
+
+        static const int MAX_PRIORITIES;
+
+        bool closed;
+        bool running;
+
+        mutable decaf::util::concurrent::Mutex mutex;
+
+        mutable ArrayPointer< decaf::util::StlQueue< Pointer<MessageDispatch> > > channels;
+
+        int enqueued;
+
+    private:
+
+        SimplePriorityMessageDispatchChannel( const SimplePriorityMessageDispatchChannel& );
+        SimplePriorityMessageDispatchChannel& operator= ( const SimplePriorityMessageDispatchChannel& );
+
+    public:
+
+        SimplePriorityMessageDispatchChannel();
+        virtual ~SimplePriorityMessageDispatchChannel();
+
+        virtual void enqueue( const Pointer<MessageDispatch>& message );
+
+        virtual void enqueueFirst( const Pointer<MessageDispatch>& message );
+
+        virtual bool isEmpty() const;
+
+        virtual bool isClosed() const {
+            return this->closed;
+        }
+
+        virtual bool isRunning() const {
+            return this->running;
+        }
+
+        virtual Pointer<MessageDispatch> dequeue( long long timeout );
+
+        virtual Pointer<MessageDispatch> dequeueNoWait();
+
+        virtual Pointer<MessageDispatch> peek() const;
+
+        virtual void start();
+
+        virtual void stop();
+
+        virtual void close();
+
+        virtual void clear();
+
+        virtual int size() const;
+
+        virtual std::vector< Pointer<MessageDispatch> > removeAll();
+
+    public:
+
+        virtual void lock() throw( decaf::lang::exceptions::RuntimeException ) {
+            mutex.lock();
+        }
+
+        virtual bool tryLock() throw( decaf::lang::exceptions::RuntimeException ) {
+            return mutex.tryLock();
+        }
+
+        virtual void unlock() throw( decaf::lang::exceptions::RuntimeException ) {
+            mutex.unlock();
+        }
+
+        virtual void wait() throw( decaf::lang::exceptions::RuntimeException,
+                                   decaf::lang::exceptions::IllegalMonitorStateException,
+                                   decaf::lang::exceptions::InterruptedException ) {
+
+            mutex.wait();
+        }
+
+        virtual void wait( long long millisecs )
+            throw( decaf::lang::exceptions::RuntimeException,
+                   decaf::lang::exceptions::IllegalMonitorStateException,
+                   decaf::lang::exceptions::InterruptedException ) {
+
+            mutex.wait( millisecs );
+        }
+
+        virtual void wait( long long millisecs, int nanos )
+            throw( decaf::lang::exceptions::RuntimeException,
+                   decaf::lang::exceptions::IllegalArgumentException,
+                   decaf::lang::exceptions::IllegalMonitorStateException,
+                   decaf::lang::exceptions::InterruptedException ) {
+
+            mutex.wait( millisecs, nanos );
+        }
+
+        virtual void notify() throw( decaf::lang::exceptions::RuntimeException,
+                                     decaf::lang::exceptions::IllegalMonitorStateException ) {
+
+            mutex.notify();
+        }
+
+        virtual void notifyAll() throw( decaf::lang::exceptions::RuntimeException,
+                                        decaf::lang::exceptions::IllegalMonitorStateException ) {
+
+            mutex.notifyAll();
+        }
+
+    private:
+
+        decaf::util::StlQueue< Pointer<MessageDispatch> >& getChannel( const Pointer<MessageDispatch>& dispatch );
+
+        Pointer<MessageDispatch> removeFirst();
+
+        Pointer<MessageDispatch> getFirst() const;
+
+    };
+
+}}
+
+#endif /* _ACTIVEMQ_CORE_SIMPLEPRIORITYMESSAGEDISPATCHCHANNEL_H_ */

Propchange: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/SimplePriorityMessageDispatchChannel.h
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/cms/Message.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/cms/Message.cpp?rev=985395&r1=985394&r2=985395&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/cms/Message.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/cms/Message.cpp Fri Aug 13 22:53:52 2010
@@ -24,5 +24,5 @@ using namespace cms;
 ////////////////////////////////////////////////////////////////////////////////
 const int Message::DEFAULT_DELIVERY_MODE = cms::DeliveryMode::PERSISTENT;
 const int Message::DEFAULT_MSG_PRIORITY = 4;
-const int Message::DEFAULT_TIME_TO_LIVE = 0;
+const long long Message::DEFAULT_TIME_TO_LIVE = 0;
 

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/cms/Session.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/cms/Session.h?rev=985395&r1=985394&r2=985395&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/cms/Session.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/cms/Session.h Fri Aug 13 22:53:52 2010
@@ -284,7 +284,7 @@ namespace cms{
          * @throws CMSException - If an internal error occurs.
          * @throws InvalidDestinationException - if an invalid destination is specified.
          */
-        virtual MessageProducer* createProducer( const Destination* destination ) = 0;
+        virtual MessageProducer* createProducer( const Destination* destination = NULL ) = 0;
 
         /**
          * Creates a new QueueBrowser to peek at Messages on the given Queue.

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/Makefile.am
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/Makefile.am?rev=985395&r1=985394&r2=985395&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/Makefile.am (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/Makefile.am Fri Aug 13 22:53:52 2010
@@ -24,6 +24,7 @@ cc_sources = \
     activemq/test/JmsMessageGroupsTest.cpp \
     activemq/test/MapMessageTest.cpp \
     activemq/test/MessageCompressionTest.cpp \
+    activemq/test/MessagePriorityTest.cpp \
     activemq/test/QueueBrowserTest.cpp \
     activemq/test/SimpleRollbackTest.cpp \
     activemq/test/SimpleTest.cpp \
@@ -39,6 +40,7 @@ cc_sources = \
     activemq/test/openwire/OpenwireJmsMessageGroupsTest.cpp \
     activemq/test/openwire/OpenwireMapMessageTest.cpp \
     activemq/test/openwire/OpenwireMessageCompressionTest.cpp \
+    activemq/test/openwire/OpenwireMessagePriorityTest.cpp \
     activemq/test/openwire/OpenwireQueueBrowserTest.cpp \
     activemq/test/openwire/OpenwireSimpleRollbackTest.cpp \
     activemq/test/openwire/OpenwireSimpleTest.cpp \
@@ -71,6 +73,7 @@ h_sources = \
     activemq/test/JmsMessageGroupsTest.h \
     activemq/test/MapMessageTest.h \
     activemq/test/MessageCompressionTest.h \
+    activemq/test/MessagePriorityTest.h \
     activemq/test/QueueBrowserTest.h \
     activemq/test/SimpleRollbackTest.h \
     activemq/test/SimpleTest.h \
@@ -86,6 +89,7 @@ h_sources = \
     activemq/test/openwire/OpenwireJmsMessageGroupsTest.h \
     activemq/test/openwire/OpenwireMapMessageTest.h \
     activemq/test/openwire/OpenwireMessageCompressionTest.h \
+    activemq/test/openwire/OpenwireMessagePriorityTest.h \
     activemq/test/openwire/OpenwireQueueBrowserTest.h \
     activemq/test/openwire/OpenwireSimpleRollbackTest.h \
     activemq/test/openwire/OpenwireSimpleTest.h \

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/TestRegistry.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/TestRegistry.cpp?rev=985395&r1=985394&r2=985395&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/TestRegistry.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/TestRegistry.cpp Fri Aug 13 22:53:52 2010
@@ -22,6 +22,7 @@
 #include "activemq/test/openwire/OpenwireExpirationTest.h"
 #include "activemq/test/openwire/OpenwireIndividualAckTest.h"
 #include "activemq/test/openwire/OpenwireMessageCompressionTest.h"
+#include "activemq/test/openwire/OpenwireMessagePriorityTest.h"
 #include "activemq/test/openwire/OpenwireMapMessageTest.h"
 #include "activemq/test/openwire/OpenwireQueueBrowserTest.h"
 #include "activemq/test/openwire/OpenwireSimpleRollbackTest.h"
@@ -50,6 +51,7 @@ CPPUNIT_TEST_SUITE_REGISTRATION( activem
 CPPUNIT_TEST_SUITE_REGISTRATION( activemq::test::openwire::OpenwireExpirationTest );
 CPPUNIT_TEST_SUITE_REGISTRATION( activemq::test::openwire::OpenwireIndividualAckTest );
 CPPUNIT_TEST_SUITE_REGISTRATION( activemq::test::openwire::OpenwireMessageCompressionTest );
+CPPUNIT_TEST_SUITE_REGISTRATION( activemq::test::openwire::OpenwireMessagePriorityTest );
 CPPUNIT_TEST_SUITE_REGISTRATION( activemq::test::openwire::OpenwireMapMessageTest );
 CPPUNIT_TEST_SUITE_REGISTRATION( activemq::test::openwire::OpenwireQueueBrowserTest );
 CPPUNIT_TEST_SUITE_REGISTRATION( activemq::test::openwire::OpenwireSimpleRollbackTest );

Added: activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/MessagePriorityTest.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/MessagePriorityTest.cpp?rev=985395&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/MessagePriorityTest.cpp (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/MessagePriorityTest.cpp Fri Aug 13 22:53:52 2010
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "MessagePriorityTest.h"
+
+#include <activemq/util/CMSListener.h>
+#include <activemq/exceptions/ActiveMQException.h>
+
+#include <decaf/lang/Thread.h>
+#include <decaf/util/UUID.h>
+#include <decaf/lang/Pointer.h>
+
+using namespace std;
+using namespace cms;
+using namespace activemq;
+using namespace activemq::test;
+using namespace activemq::util;
+using namespace activemq::exceptions;
+using namespace decaf;
+using namespace decaf::lang;
+using namespace decaf::util;
+
+////////////////////////////////////////////////////////////////////////////////
+namespace {
+
+    class ProducerThread : public Thread {
+    private:
+
+        Session* session;
+        Destination* destination;
+        int num;
+        int priority;
+
+    public:
+
+        ProducerThread( Session* session, Destination* destination, int num, int priority ) :
+            session( session ), destination( destination ), num( num ), priority( priority ) {
+        }
+
+        virtual ~ProducerThread() {}
+
+        virtual void run() {
+
+            Pointer<MessageProducer> producer( session->createProducer( destination ) );
+            producer->setDeliveryMode( cms::DeliveryMode::NON_PERSISTENT );
+            producer->setPriority( priority );
+
+            for( int i = 0; i < num; ++i ) {
+                Pointer<TextMessage> message( session->createTextMessage( "Test Message") );
+                producer->send( message.get() );
+            }
+        }
+    };
+}
+
+////////////////////////////////////////////////////////////////////////////////
+MessagePriorityTest::MessagePriorityTest() {
+}
+
+////////////////////////////////////////////////////////////////////////////////
+MessagePriorityTest::~MessagePriorityTest() {
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void MessagePriorityTest::testMessagePrioritySendReceive() {
+
+    static const int MSG_COUNT = 25;
+
+    // Create CMS Object for Comms
+    cms::Session* session( cmsProvider->getSession() );
+
+    cms::MessageConsumer* consumer = cmsProvider->getConsumer();
+
+    Destination* destination = cmsProvider->getDestination();
+
+    ProducerThread thread1( session, destination, MSG_COUNT, 9 );
+    ProducerThread thread2( session, destination, MSG_COUNT, 1 );
+
+    thread1.start();
+    thread2.start();
+
+    thread1.join();
+    thread2.join();
+
+    Thread::sleep( 3000 );
+
+    for( int i = 0; i < MSG_COUNT * 2; ++i ) {
+        Pointer<cms::Message> message( consumer->receive( 2000 ) );
+        CPPUNIT_ASSERT( message != NULL );
+        CPPUNIT_ASSERT( message->getCMSPriority() == ( i < MSG_COUNT ? 9 : 1 ) );
+    }
+}

Propchange: activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/MessagePriorityTest.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/MessagePriorityTest.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/MessagePriorityTest.h?rev=985395&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/MessagePriorityTest.h (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/MessagePriorityTest.h Fri Aug 13 22:53:52 2010
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _ACTIVEMQ_TEST_MESSAGEPRIORITYTEST_H_
+#define _ACTIVEMQ_TEST_MESSAGEPRIORITYTEST_H_
+
+#include <activemq/test/CMSTestFixture.h>
+#include <activemq/util/IntegrationCommon.h>
+
+namespace activemq {
+namespace test {
+
+    class MessagePriorityTest : public CMSTestFixture {
+    public:
+
+        MessagePriorityTest();
+        virtual ~MessagePriorityTest();
+
+        void testMessagePrioritySendReceive();
+
+    };
+
+}}
+
+#endif /* _ACTIVEMQ_TEST_MESSAGEPRIORITYTEST_H_ */

Propchange: activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/MessagePriorityTest.h
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireMessagePriorityTest.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireMessagePriorityTest.cpp?rev=985395&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireMessagePriorityTest.cpp (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireMessagePriorityTest.cpp Fri Aug 13 22:53:52 2010
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "OpenwireMessagePriorityTest.h"
+
+using namespace activemq;
+using namespace activemq::test;
+using namespace activemq::test::openwire;
+
+////////////////////////////////////////////////////////////////////////////////
+OpenwireMessagePriorityTest::OpenwireMessagePriorityTest() {
+}
+
+////////////////////////////////////////////////////////////////////////////////
+OpenwireMessagePriorityTest::~OpenwireMessagePriorityTest() {
+}

Propchange: activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireMessagePriorityTest.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireMessagePriorityTest.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireMessagePriorityTest.h?rev=985395&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireMessagePriorityTest.h (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireMessagePriorityTest.h Fri Aug 13 22:53:52 2010
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _ACTIVEMQ_TEST_OPENWIRE_OPENWIREMESSAGEPRIORITYTEST_H_
+#define _ACTIVEMQ_TEST_OPENWIRE_OPENWIREMESSAGEPRIORITYTEST_H_
+
+#include <activemq/test/MessagePriorityTest.h>
+
+namespace activemq {
+namespace test {
+namespace openwire {
+
+    class OpenwireMessagePriorityTest : public MessagePriorityTest {
+
+        CPPUNIT_TEST_SUITE( OpenwireMessagePriorityTest );
+        CPPUNIT_TEST( testMessagePrioritySendReceive );
+        CPPUNIT_TEST_SUITE_END();
+
+    public:
+
+        OpenwireMessagePriorityTest();
+        virtual ~OpenwireMessagePriorityTest();
+
+        virtual std::string getBrokerURL() const {
+            return activemq::util::IntegrationCommon::getInstance().getOpenwireURL();
+        }
+
+    };
+
+}}}
+
+#endif /* _ACTIVEMQ_TEST_OPENWIRE_OPENWIREMESSAGEPRIORITYTEST_H_ */

Propchange: activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireMessagePriorityTest.h
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireQueueBrowserTest.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireQueueBrowserTest.h?rev=985395&r1=985394&r2=985395&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireQueueBrowserTest.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireQueueBrowserTest.h Fri Aug 13 22:53:52 2010
@@ -39,8 +39,7 @@ namespace openwire {
         virtual ~OpenwireQueueBrowserTest();
 
         virtual std::string getBrokerURL() const {
-            return activemq::util::IntegrationCommon::getInstance().getOpenwireURL();// +
-                         //   "?transport.commandTracingEnabled=true";
+            return activemq::util::IntegrationCommon::getInstance().getOpenwireURL();
         }
 
     };

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/test/Makefile.am
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test/Makefile.am?rev=985395&r1=985394&r2=985395&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test/Makefile.am (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test/Makefile.am Fri Aug 13 22:53:52 2010
@@ -36,7 +36,8 @@ cc_sources = \
     activemq/core/ActiveMQConnectionFactoryTest.cpp \
     activemq/core/ActiveMQConnectionTest.cpp \
     activemq/core/ActiveMQSessionTest.cpp \
-    activemq/core/MessageDispatchChannelTest.cpp \
+    activemq/core/FifoMessageDispatchChannelTest.cpp \
+    activemq/core/SimplePriorityMessageDispatchChannelTest.cpp \
     activemq/exceptions/ActiveMQExceptionTest.cpp \
     activemq/state/ConnectionStateTest.cpp \
     activemq/state/ConnectionStateTrackerTest.cpp \
@@ -246,7 +247,8 @@ h_sources = \
     activemq/core/ActiveMQConnectionFactoryTest.h \
     activemq/core/ActiveMQConnectionTest.h \
     activemq/core/ActiveMQSessionTest.h \
-    activemq/core/MessageDispatchChannelTest.h \
+    activemq/core/FifoMessageDispatchChannelTest.h \
+    activemq/core/SimplePriorityMessageDispatchChannelTest.h \
     activemq/exceptions/ActiveMQExceptionTest.h \
     activemq/state/ConnectionStateTest.h \
     activemq/state/ConnectionStateTrackerTest.h \



Mime
View raw message