activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r1334183 [2/4] - in /activemq/activemq-cpp/trunk/activemq-cpp/src: main/ main/activemq/commands/ main/activemq/core/ main/activemq/core/kernels/ main/activemq/state/ main/activemq/util/ main/cms/ main/decaf/lang/ main/decaf/util/ main/decaf...
Date Fri, 04 May 2012 21:04:54 GMT
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h?rev=1334183&r1=1334182&r2=1334183&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h Fri May  4 21:04:52 2012
@@ -519,6 +519,23 @@ namespace core{
          */
         long long getNextLocalTransactionId();
 
+        /**
+         * Is the Connection configured to watch for advisory messages to maintain state of
+         * temporary destination create and destroy.
+         *
+         * @return true if the Connection will listen for temporary topic advisory messages.
+         */
+        bool isWatchTopicAdvisories() const;
+
+        /**
+         * Sets whether this Connection is listening for advisory messages regarding temporary
+         * destination creation and deletion.
+         *
+         * @param value
+         *      Boolean indicating if advisory message monitoring should be enabled.
+         */
+        void setWatchTopicAdvisories(bool value);
+
     public: // TransportListener
 
         /**
@@ -677,6 +694,14 @@ namespace core{
         void onAsyncException(const decaf::lang::Exception& ex);
 
         /**
+         * Handles async client internal exceptions which don't usually affect the connection
+         * itself.  These are reported but do not shutdown the Connection.
+         *
+         * @param error the exception that the problem
+         */
+        void onClientInternalException(const decaf::lang::Exception& ex);
+
+        /**
          * Check for Closed State and Throw an exception if true.
          *
          * @throws CMSException if the Connection is closed.
@@ -700,6 +725,51 @@ namespace core{
          */
         decaf::util::concurrent::ExecutorService* getExecutor() const;
 
+        /**
+         * Adds the given Temporary Destination to this Connections collection of known
+         * Temporary Destinations.
+         *
+         * @param destination
+         *      The temporary destination that this connection should track.
+         */
+        void addTempDestination(Pointer<commands::ActiveMQTempDestination> destination);
+
+        /**
+         * Removes the given Temporary Destination to this Connections collection of known
+         * Temporary Destinations.
+         *
+         * @param destination
+         *      The temporary destination that this connection should stop tracking.
+         */
+        void removeTempDestination(Pointer<commands::ActiveMQTempDestination> destination);
+
+        /**
+         * Removes the given Temporary Destination to this Connections collection of known
+         * Temporary Destinations.
+         *
+         * @param destination
+         *      The temporary destination that this connection should remove from the Broker.
+         *
+         * @throws CMSException if the temporary destination is in use by an active Session.
+         */
+        void deleteTempDestination(Pointer<commands::ActiveMQTempDestination> destination);
+
+        /**
+         * Removes any TempDestinations that this connection has cached, ignoring any exceptions
+         * generated because the destination is in use as they should not be removed.  This method
+         * is useful for Connection pools that retain connection objects for long durations and
+         * want to periodically purge old temporary destination instances this connection is tracking.
+         */
+        void cleanUpTempDestinations();
+
+        /**
+         * Determines whether the supplied Temporary Destination has already been deleted from the
+         * Broker.  If watchTopicAdvisories is disabled this method will always return false.
+         *
+         * @returns true if the temporary destination was deleted already.
+         */
+        bool isDeleted(Pointer<commands::ActiveMQTempDestination> destination) const;
+
     protected:
 
         /**

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnectionFactory.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnectionFactory.cpp?rev=1334183&r1=1334182&r2=1334183&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnectionFactory.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnectionFactory.cpp Fri May  4 21:04:52 2012
@@ -74,6 +74,7 @@ namespace core{
         bool useAsyncSend;
         bool messagePrioritySupported;
         bool useCompression;
+        bool watchTopicAdvisories;
         int compressionLevel;
         unsigned int sendTimeout;
         unsigned int closeTimeout;
@@ -83,23 +84,24 @@ namespace core{
         std::auto_ptr<PrefetchPolicy> defaultPrefetchPolicy;
         std::auto_ptr<RedeliveryPolicy> defaultRedeliveryPolicy;
 
-        FactorySettings() : properties( new Properties() ),
+        FactorySettings() : properties(new Properties()),
                             username(),
                             password(),
                             clientId(),
-                            brokerURI( ActiveMQConnectionFactory::DEFAULT_URI ),
-                            dispatchAsync( true ),
-                            alwaysSyncSend( false ),
-                            useAsyncSend( false ),
-                            messagePrioritySupported( true ),
-                            useCompression( false ),
-                            compressionLevel( -1 ),
-                            sendTimeout( 0 ),
-                            closeTimeout( 15000 ),
-                            producerWindowSize( 0 ),
-                            defaultListener( NULL ),
-                            defaultPrefetchPolicy( new DefaultPrefetchPolicy() ),
-                            defaultRedeliveryPolicy( new DefaultRedeliveryPolicy() ) {
+                            brokerURI(ActiveMQConnectionFactory::DEFAULT_URI),
+                            dispatchAsync(true),
+                            alwaysSyncSend(false),
+                            useAsyncSend(false),
+                            messagePrioritySupported(true),
+                            useCompression(false),
+                            watchTopicAdvisories(true),
+                            compressionLevel(-1),
+                            sendTimeout(0),
+                            closeTimeout(15000),
+                            producerWindowSize(0),
+                            defaultListener(NULL),
+                            defaultPrefetchPolicy(new DefaultPrefetchPolicy()),
+                            defaultRedeliveryPolicy(new DefaultRedeliveryPolicy()) {
         }
 
         void updateConfiguration( const URI& uri ) {
@@ -363,6 +365,7 @@ void ActiveMQConnectionFactory::configur
     connection->setPrefetchPolicy(this->settings->defaultPrefetchPolicy->clone());
     connection->setRedeliveryPolicy(this->settings->defaultRedeliveryPolicy->clone());
     connection->setMessagePrioritySupported(this->settings->messagePrioritySupported);
+    connection->setWatchTopicAdvisories(this->settings->watchTopicAdvisories);
 
     if (this->settings->defaultListener) {
         connection->setExceptionListener(this->settings->defaultListener);
@@ -538,3 +541,13 @@ bool ActiveMQConnectionFactory::isMessag
 void ActiveMQConnectionFactory::setMessagePrioritySupported(bool value) {
     this->settings->messagePrioritySupported = value;
 }
+
+////////////////////////////////////////////////////////////////////////////////
+bool ActiveMQConnectionFactory::isWatchTopicAdvisories() const {
+    return this->settings->watchTopicAdvisories;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConnectionFactory::setWatchTopicAdvisories(bool value) {
+    this->settings->watchTopicAdvisories = value;
+}

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnectionFactory.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnectionFactory.h?rev=1334183&r1=1334182&r2=1334183&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnectionFactory.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnectionFactory.h Fri May  4 21:04:52 2012
@@ -362,6 +362,23 @@ namespace core{
          */
         void setMessagePrioritySupported(bool value);
 
+        /**
+         * Is the Connection created by this factory configured to watch for advisory messages
+         * that inform the Connection about temporary destination create / destroy.
+         *
+         * @return true if Connection's will listen for temporary destination advisory messages.
+         */
+        bool isWatchTopicAdvisories() const;
+
+        /**
+         * Sets whether Connection's created by this factory will listen for advisory messages
+         * regarding temporary destination creation and deletion.
+         *
+         * @param value
+         *      Boolean indicating if advisory message monitoring should be enabled.
+         */
+        void setWatchTopicAdvisories(bool value);
+
     public:
 
         /**

Added: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/AdvisoryConsumer.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/AdvisoryConsumer.cpp?rev=1334183&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/AdvisoryConsumer.cpp (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/AdvisoryConsumer.cpp Fri May  4 21:04:52 2012
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "AdvisoryConsumer.h"
+
+#include <activemq/core/ActiveMQConstants.h>
+#include <activemq/util/AdvisorySupport.h>
+#include <decaf/util/concurrent/atomic/AtomicBoolean.h>
+#include <decaf/lang/exceptions/ClassCastException.h>
+
+using namespace activemq;
+using namespace activemq::core;
+using namespace activemq::util;
+using namespace activemq::commands;
+using namespace activemq::exceptions;
+
+using namespace decaf;
+using namespace decaf::lang;
+using namespace decaf::lang::exceptions;
+using namespace decaf::util;
+using namespace decaf::util::concurrent;
+using namespace decaf::util::concurrent::atomic;
+
+////////////////////////////////////////////////////////////////////////////////
+namespace activemq {
+namespace core {
+
+    class AdvisoryConsumerConfig {
+    public:
+
+        int deliveredCounter;
+        Pointer<ConsumerInfo> info;
+        AtomicBoolean closed;
+
+        AdvisoryConsumerConfig() : deliveredCounter(0), info(), closed(false) {
+        }
+    };
+
+}}
+
+////////////////////////////////////////////////////////////////////////////////
+AdvisoryConsumer::AdvisoryConsumer(ActiveMQConnection* connection, Pointer<commands::ConsumerId> consumerId) :
+    Dispatcher(), config(new AdvisoryConsumerConfig()), connection(connection) {
+
+    if (connection == NULL) {
+        throw NullPointerException(__FILE__, __LINE__, "Parent Connection pointer was NULL");
+    }
+
+    this->config->info.reset(new ConsumerInfo());
+
+    Pointer<ActiveMQDestination> destination(AdvisorySupport::getTempDestinationCompositeAdvisoryTopic());
+
+    this->config->info->setConsumerId(consumerId);
+    this->config->info->setDestination(destination);
+    this->config->info->setPrefetchSize(1000);
+    this->config->info->setNoLocal(true);
+
+    this->connection->addDispatcher(this->config->info->getConsumerId(), this);
+    this->connection->syncRequest(this->config->info);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+AdvisoryConsumer::~AdvisoryConsumer() {
+    try {
+        delete config;
+    }
+    AMQ_CATCH_NOTHROW(ActiveMQException)
+    AMQ_CATCHALL_NOTHROW()
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void AdvisoryConsumer::dispose() {
+
+    if (!this->config->closed.compareAndSet(false, true)) {
+
+        try {
+            this->connection->oneway(this->config->info->createRemoveCommand());
+        } catch (cms::CMSException e) {
+        }
+
+        this->connection->removeDispatcher(this->config->info->getConsumerId());
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void AdvisoryConsumer::dispatch(const Pointer<MessageDispatch>& message) {
+
+    // Auto ack messages when we reach 75% of the prefetch
+    this->config->deliveredCounter++;
+    if (this->config->deliveredCounter > (0.75 * this->config->info->getPrefetchSize())) {
+        try {
+
+            Pointer<MessageAck> ack(new MessageAck());
+
+            ack->setAckType(ActiveMQConstants::ACK_TYPE_CONSUMED);
+            ack->setConsumerId(this->config->info->getConsumerId());
+            ack->setDestination(message->getDestination());
+            ack->setMessageCount(this->config->deliveredCounter);
+            ack->setLastMessageId(message->getMessage()->getMessageId());
+
+            this->connection->oneway(ack);
+
+            this->config->deliveredCounter = 0;
+        } catch (Exception& e) {
+            this->connection->onClientInternalException(e);
+        }
+    }
+
+    Pointer<DataStructure> object = message->getMessage()->getDataStructure();
+    if (object != NULL) {
+        try {
+            Pointer<DestinationInfo> info = object.dynamicCast<DestinationInfo>();
+            processDestinationInfo(info);
+        } catch (ClassCastException& ex) {
+        }
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void AdvisoryConsumer::processDestinationInfo(Pointer<commands::DestinationInfo> info) {
+
+    Pointer<ActiveMQDestination> dest = info->getDestination();
+    if (!dest->isTemporary()) {
+        return;
+    }
+
+    Pointer<ActiveMQTempDestination> tempDest = dest.dynamicCast<ActiveMQTempDestination>();
+    if (info->getOperationType() == ActiveMQConstants::DESTINATION_ADD_OPERATION) {
+        this->connection->addTempDestination(tempDest);
+    } else if (info->getOperationType() == ActiveMQConstants::DESTINATION_REMOVE_OPERATION) {
+        this->connection->removeTempDestination(tempDest);
+    }
+}

Propchange: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/AdvisoryConsumer.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/AdvisoryConsumer.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/AdvisoryConsumer.h?rev=1334183&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/AdvisoryConsumer.h (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/AdvisoryConsumer.h Fri May  4 21:04:52 2012
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _ACTIVEMQ_CORE_ADVISORYCONSUMER_H_
+#define _ACTIVEMQ_CORE_ADVISORYCONSUMER_H_
+
+#include <activemq/util/Config.h>
+#include <activemq/core/Dispatcher.h>
+#include <activemq/core/ActiveMQConnection.h>
+#include <activemq/commands/ConsumerId.h>
+#include <activemq/commands/DestinationInfo.h>
+
+#include <decaf/lang/Pointer.h>
+
+namespace activemq {
+namespace core {
+
+    class AdvisoryConsumerConfig;
+
+    using decaf::lang::Pointer;
+
+    class AMQCPP_API AdvisoryConsumer : Dispatcher {
+    private:
+
+        AdvisoryConsumerConfig* config;
+        ActiveMQConnection* connection;
+
+    private:
+
+        AdvisoryConsumer(const AdvisoryConsumer&);
+        AdvisoryConsumer& operator= (const AdvisoryConsumer&);
+
+    public:
+
+        AdvisoryConsumer(ActiveMQConnection* connection, Pointer<commands::ConsumerId> consumerId);
+        virtual ~AdvisoryConsumer();
+
+    public:
+
+        void dispose();
+
+        virtual void dispatch(const Pointer<MessageDispatch>& message);
+
+    private:
+
+        void processDestinationInfo(Pointer<commands::DestinationInfo> destination);
+
+    };
+
+}}
+
+#endif /* _ACTIVEMQ_CORE_ADVISORYCONSUMER_H_ */

Propchange: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/AdvisoryConsumer.h
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/DispatchData.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/DispatchData.h?rev=1334183&r1=1334182&r2=1334183&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/DispatchData.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/DispatchData.h Fri May  4 21:04:52 2012
@@ -44,8 +44,8 @@ namespace core {
 
         DispatchData() {}
 
-        DispatchData( const decaf::lang::Pointer<commands::ConsumerId>& consumer,
-                      const decaf::lang::Pointer<commands::Message>& message ) {
+        DispatchData(const decaf::lang::Pointer<commands::ConsumerId>& consumer,
+                     const decaf::lang::Pointer<commands::Message>& message) {
             this->consumerId = consumer;
             this->message = message;
         }

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/Dispatcher.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/Dispatcher.h?rev=1334183&r1=1334182&r2=1334183&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/Dispatcher.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/Dispatcher.h Fri May  4 21:04:52 2012
@@ -22,8 +22,8 @@
 #include <activemq/util/Config.h>
 #include <decaf/lang/Pointer.h>
 
-namespace activemq{
-namespace core{
+namespace activemq {
+namespace core {
 
     using decaf::lang::Pointer;
     using activemq::commands::MessageDispatch;
@@ -43,7 +43,7 @@ namespace core{
          * @param message
          *      The message to be dispatched to a waiting consumer.
          */
-        virtual void dispatch( const Pointer<MessageDispatch>& message ) = 0;
+        virtual void dispatch(const Pointer<MessageDispatch>& message) = 0;
 
     };
 

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp?rev=1334183&r1=1334182&r2=1334183&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp Fri May  4 21:04:52 2012
@@ -69,8 +69,8 @@ namespace kernels {
     class ActiveMQConsumerKernelConfig {
     private:
 
-        ActiveMQConsumerKernelConfig( const ActiveMQConsumerKernelConfig& );
-        ActiveMQConsumerKernelConfig& operator= ( const ActiveMQConsumerKernelConfig& );
+        ActiveMQConsumerKernelConfig(const ActiveMQConsumerKernelConfig&);
+        ActiveMQConsumerKernelConfig& operator=(const ActiveMQConsumerKernelConfig&);
 
     public:
 
@@ -289,7 +289,7 @@ namespace kernels {
 
         virtual void run() {
             try {
-                if(!this->consumer->isClosed()) {
+                if (!this->consumer->isClosed()) {
                     this->consumer->start();
                 }
             } catch(cms::CMSException& ex) {
@@ -311,18 +311,34 @@ ActiveMQConsumerKernel::ActiveMQConsumer
                                                bool noLocal,
                                                bool browser,
                                                bool dispatchAsync,
-                                               cms::MessageListener* listener ) : internal(NULL), session(NULL), consumerInfo() {
+                                               cms::MessageListener* listener) : internal(NULL), session(NULL), consumerInfo() {
 
     if (session == NULL) {
-        throw ActiveMQException(__FILE__, __LINE__, "ActiveMQConsumerKernel::ActiveMQConsumerKernel - Init with NULL Session");
+        throw IllegalArgumentException(__FILE__, __LINE__, "Consumer created with NULL Session");
     }
 
     if (destination == NULL) {
-        throw ActiveMQException(__FILE__, __LINE__, "ActiveMQConsumerKernel::ActiveMQConsumerKernel - Init with NULL Destination");
+        throw cms::InvalidDestinationException("Consumer created with NULL Destination");
+    } else if (destination->getPhysicalName() == "") {
+        throw cms::InvalidDestinationException("Destination given has no Physical Name.");
+    } else if (destination->isTemporary()) {
+
+        std::string physicalName = destination->getPhysicalName();
+        std::string connectionId = session->getConnection()->getConnectionInfo().getConnectionId()->getValue();
+
+        if (physicalName.find(connectionId) == std::string::npos) {
+            throw cms::InvalidDestinationException("Cannot use a Temporary destination from another Connection");
+        }
+
+        Pointer<ActiveMQTempDestination> tempDest = destination.dynamicCast<ActiveMQTempDestination>();
+
+        if (session->getConnection()->isDeleted(tempDest)) {
+            throw cms::InvalidDestinationException("Cannot use a Temporary destination that has been deleted");
+        }
     }
 
-    if (destination->getPhysicalName() == "") {
-        throw ActiveMQException(__FILE__, __LINE__, "ActiveMQConsumerKernel::ActiveMQConsumerKernel - Destination given has no Physical Name.");
+    if (prefetch < 0) {
+        throw IllegalArgumentException(__FILE__, __LINE__, "Cannot create a consumer with a negative prefetch");
     }
 
     this->internal = new ActiveMQConsumerKernelConfig();
@@ -448,6 +464,7 @@ void ActiveMQConsumerKernel::doClose() {
 void ActiveMQConsumerKernel::dispose() {
 
     try {
+
         if (!this->isClosed()) {
 
             if (!session->isTransacted()) {
@@ -513,7 +530,7 @@ std::string ActiveMQConsumerKernel::getM
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-decaf::lang::Pointer<MessageDispatch> ActiveMQConsumerKernel::dequeue( long long timeout ) {
+decaf::lang::Pointer<MessageDispatch> ActiveMQConsumerKernel::dequeue(long long timeout) {
 
     try {
 
@@ -528,29 +545,29 @@ decaf::lang::Pointer<MessageDispatch> Ac
         // Loop until the time is up or we get a non-expired message
         while (true) {
 
-            Pointer<MessageDispatch> dispatch = this->internal->unconsumedMessages->dequeue( timeout );
+            Pointer<MessageDispatch> dispatch = this->internal->unconsumedMessages->dequeue(timeout);
             if (dispatch == NULL) {
 
-                if( timeout > 0 && !this->internal->unconsumedMessages->isClosed() ) {
-                    timeout = Math::max( deadline - System::currentTimeMillis(), 0LL );
+                if (timeout > 0 && !this->internal->unconsumedMessages->isClosed()) {
+                    timeout = Math::max(deadline - System::currentTimeMillis(), 0LL);
                 } else {
-                    if( this->internal->failureError != NULL ) {
+                    if (this->internal->failureError != NULL) {
                         throw CMSExceptionSupport::create(*this->internal->failureError);
                     } else {
-                        return Pointer<MessageDispatch>();
+                        return Pointer<MessageDispatch> ();
                     }
                 }
 
-            } else if( dispatch->getMessage() == NULL ) {
+            } else if (dispatch->getMessage() == NULL) {
 
-                return Pointer<MessageDispatch>();
+                return Pointer<MessageDispatch> ();
 
-            } else if( dispatch->getMessage()->isExpired() ) {
+            } else if (dispatch->getMessage()->isExpired()) {
 
-                beforeMessageIsConsumed( dispatch );
-                afterMessageIsConsumed( dispatch, true );
-                if( timeout > 0 ) {
-                    timeout = Math::max( deadline - System::currentTimeMillis(), 0LL );
+                beforeMessageIsConsumed(dispatch);
+                afterMessageIsConsumed(dispatch, true);
+                if (timeout > 0) {
+                    timeout = Math::max(deadline - System::currentTimeMillis(), 0LL);
                 }
 
                 continue;
@@ -573,24 +590,24 @@ cms::Message* ActiveMQConsumerKernel::re
         this->checkClosed();
 
         // Send a request for a new message if needed
-        this->sendPullRequest( 0 );
+        this->sendPullRequest(0);
 
         // Wait for the next message.
-        Pointer<MessageDispatch> message = dequeue( -1 );
-        if( message == NULL ) {
+        Pointer<MessageDispatch> message = dequeue(-1);
+        if (message == NULL) {
             return NULL;
         }
 
         // Message pre-processing
-        beforeMessageIsConsumed( message );
+        beforeMessageIsConsumed(message);
 
         // Need to clone the message because the user is responsible for freeing
         // its copy of the message.
         cms::Message* clonedMessage =
-            dynamic_cast<cms::Message*>( message->getMessage()->cloneDataStructure() );
+            dynamic_cast<cms::Message*>(message->getMessage()->cloneDataStructure());
 
         // Post processing (may result in the message being deleted)
-        afterMessageIsConsumed( message, false );
+        afterMessageIsConsumed(message, false);
 
         // Return the cloned message.
         return clonedMessage;
@@ -606,24 +623,24 @@ cms::Message* ActiveMQConsumerKernel::re
         this->checkClosed();
 
         // Send a request for a new message if needed
-        this->sendPullRequest( millisecs );
+        this->sendPullRequest(millisecs);
 
         // Wait for the next message.
-        Pointer<MessageDispatch> message = dequeue( millisecs );
-        if( message == NULL ) {
+        Pointer<MessageDispatch> message = dequeue(millisecs);
+        if (message == NULL) {
             return NULL;
         }
 
         // Message preprocessing
-        beforeMessageIsConsumed( message );
+        beforeMessageIsConsumed(message);
 
         // Need to clone the message because the user is responsible for freeing
         // its copy of the message.
         cms::Message* clonedMessage =
-            dynamic_cast<cms::Message*>( message->getMessage()->cloneDataStructure() );
+            dynamic_cast<cms::Message*>(message->getMessage()->cloneDataStructure());
 
         // Post processing (may result in the message being deleted)
-        afterMessageIsConsumed( message, false );
+        afterMessageIsConsumed(message, false);
 
         // Return the cloned message.
         return clonedMessage;
@@ -639,24 +656,24 @@ cms::Message* ActiveMQConsumerKernel::re
         this->checkClosed();
 
         // Send a request for a new message if needed
-        this->sendPullRequest( -1 );
+        this->sendPullRequest(-1);
 
         // Get the next available message, if there is one.
-        Pointer<MessageDispatch> message = dequeue( 0 );
-        if( message == NULL ) {
+        Pointer<MessageDispatch> message = dequeue(0);
+        if (message == NULL) {
             return NULL;
         }
 
         // Message preprocessing
-        beforeMessageIsConsumed( message );
+        beforeMessageIsConsumed(message);
 
         // Need to clone the message because the user is responsible for freeing
         // its copy of the message.
         cms::Message* clonedMessage =
-            dynamic_cast<cms::Message*>( message->getMessage()->cloneDataStructure() );
+            dynamic_cast<cms::Message*>(message->getMessage()->cloneDataStructure());
 
         // Post processing (may result in the message being deleted)
-        afterMessageIsConsumed( message, false );
+        afterMessageIsConsumed(message, false);
 
         // Return the cloned message.
         return clonedMessage;
@@ -671,31 +688,30 @@ void ActiveMQConsumerKernel::setMessageL
 
         this->checkClosed();
 
-        if( this->consumerInfo->getPrefetchSize() == 0 && listener != NULL ) {
-            throw ActiveMQException(
-                __FILE__, __LINE__,
+        if (this->consumerInfo->getPrefetchSize() == 0 && listener != NULL) {
+            throw ActiveMQException(__FILE__, __LINE__,
                 "Cannot deliver async when Prefetch is Zero, set Prefecth to at least One.");
         }
 
-        if( listener != NULL ) {
+        if (listener != NULL) {
 
             // Now that we have a valid message listener, redispatch all the messages that it missed.
             bool wasStarted = session->isStarted();
-            if( wasStarted ) {
+            if (wasStarted) {
                 session->stop();
             }
 
-            synchronized( &(this->internal->listenerMutex) ) {
+            synchronized(&(this->internal->listenerMutex)) {
                 this->internal->listener = listener;
             }
 
-            this->session->redispatch( *(this->internal->unconsumedMessages) );
+            this->session->redispatch(*(this->internal->unconsumedMessages));
 
-            if( wasStarted ) {
+            if (wasStarted) {
                 this->session->start();
             }
         } else {
-            synchronized( &(this->internal->listenerMutex) ) {
+            synchronized(&(this->internal->listenerMutex)) {
                 this->internal->listener = NULL;
             }
         }
@@ -708,26 +724,25 @@ void ActiveMQConsumerKernel::beforeMessa
 
     // If the Session is in ClientAcknowledge or IndividualAcknowledge mode, then
     // we set the handler in the message to this object and send it out.
-    if( session->isClientAcknowledge() ) {
-        Pointer<ActiveMQAckHandler> ackHandler( new ClientAckHandler( this->session ) );
-        dispatch->getMessage()->setAckHandler( ackHandler );
-    } else if( session->isIndividualAcknowledge() ) {
-        Pointer<ActiveMQAckHandler> ackHandler( new IndividualAckHandler( this, dispatch ) );
-        dispatch->getMessage()->setAckHandler( ackHandler );
+    if (session->isClientAcknowledge()) {
+        Pointer<ActiveMQAckHandler> ackHandler(new ClientAckHandler(this->session));
+        dispatch->getMessage()->setAckHandler(ackHandler);
+    } else if (session->isIndividualAcknowledge()) {
+        Pointer<ActiveMQAckHandler> ackHandler(new IndividualAckHandler(this, dispatch));
+        dispatch->getMessage()->setAckHandler(ackHandler);
     }
 
-    this->internal->lastDeliveredSequenceId =
-        dispatch->getMessage()->getMessageId()->getBrokerSequenceId();
+    this->internal->lastDeliveredSequenceId = dispatch->getMessage()->getMessageId()->getBrokerSequenceId();
 
-    if( !isAutoAcknowledgeBatch() ) {
+    if (!isAutoAcknowledgeBatch()) {
 
         // When not in an Auto
-        synchronized( &this->internal->dispatchedMessages ) {
-            this->internal->dispatchedMessages.addFirst( dispatch );
+        synchronized(&this->internal->dispatchedMessages) {
+            this->internal->dispatchedMessages.addFirst(dispatch);
         }
 
-        if( this->session->isTransacted() ) {
-            ackLater( dispatch, ActiveMQConstants::ACK_TYPE_DELIVERED );
+        if (this->session->isTransacted()) {
+            ackLater(dispatch, ActiveMQConstants::ACK_TYPE_DELIVERED);
         }
     }
 }
@@ -1354,3 +1369,8 @@ void ActiveMQConsumerKernel::setPrefetch
     deliverAcks();
     this->consumerInfo->setCurrentPrefetchSize(prefetchSize);
 }
+
+////////////////////////////////////////////////////////////////////////////////
+bool ActiveMQConsumerKernel::isInUse(Pointer<ActiveMQDestination> destination) const {
+    return this->consumerInfo->getDestination()->equals(destination.get());
+}

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.h?rev=1334183&r1=1334182&r2=1334183&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.h Fri May  4 21:04:52 2012
@@ -110,7 +110,7 @@ namespace kernels {
 
         virtual void dispatch( const Pointer<MessageDispatch>& message );
 
-    public:  // ActiveMQConsumer Methods
+    public:  // ActiveMQConsumerKernel Methods
 
         /**
          * Method called to acknowledge all messages that have been received so far.
@@ -262,6 +262,13 @@ namespace kernels {
          */
         void setPrefetchSize(int prefetchSize);
 
+        /**
+         * Checks if the given destination is the Destination that this Consumer is subscribed to.
+         *
+         * @return true if the consumer is subscribed to the given destination.
+         */
+        bool isInUse(Pointer<commands::ActiveMQDestination> destination) const;
+
     protected:
 
         /**

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQProducerKernel.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQProducerKernel.cpp?rev=1334183&r1=1334182&r2=1334183&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQProducerKernel.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQProducerKernel.cpp Fri May  4 21:04:52 2012
@@ -23,6 +23,7 @@
 #include <activemq/commands/RemoveInfo.h>
 #include <activemq/util/CMSExceptionSupport.h>
 #include <activemq/util/ActiveMQProperties.h>
+#include <activemq/util/ActiveMQMessageTransformation.h>
 #include <decaf/lang/exceptions/NullPointerException.h>
 #include <decaf/lang/exceptions/InvalidStateException.h>
 #include <decaf/lang/exceptions/IllegalArgumentException.h>
@@ -54,7 +55,8 @@ ActiveMQProducerKernel::ActiveMQProducer
                                                                         producerInfo(),
                                                                         closed(false),
                                                                         memoryUsage(),
-                                                                        destination() {
+                                                                        destination(),
+                                                                        messageSequence() {
 
     if (session == NULL || producerId == NULL) {
         throw ActiveMQException(
@@ -75,7 +77,7 @@ ActiveMQProducerKernel::ActiveMQProducer
         this->producerInfo->setDispatchAsync(
             Boolean::parseBoolean(options.getProperty("producer.dispatchAsync", "false")));
 
-        this->destination = destination.dynamicCast<cms::Destination> ();
+        this->destination = destination.dynamicCast<cms::Destination>();
     }
 
     // TODO - Check for need of MemoryUsage if there's a producer Windows size
@@ -176,13 +178,20 @@ void ActiveMQProducerKernel::send(const 
             throw cms::InvalidDestinationException("Don't understand null destinations", NULL);
         }
 
-        const cms::Destination* dest;
-        if (destination == dynamic_cast<cms::Destination*> (this->producerInfo->getDestination().get())) {
-            dest = destination;
+        Pointer<ActiveMQDestination> dest;
+        const ActiveMQDestination* transformed;
+
+        if (destination == this->destination.get()) {
+            dest = this->producerInfo->getDestination();
         } else if (this->producerInfo->getDestination() == NULL) {
-            // TODO - We should apply a Transform so ensure the user hasn't create some
-            //        external cms::Destination implementation.
-            dest = destination;
+            // We always need to use a copy of the users destination since we want to control
+            // its lifetime.  If the transform results in a new destination we can use that, but
+            // if its already an ActiveMQDestination then we need to clone it.
+            if (ActiveMQMessageTransformation::transformDestination(destination, &transformed)) {
+                dest.reset(const_cast<ActiveMQDestination*>(transformed));
+            } else {
+                dest.reset(transformed->cloneDataStructure());
+            }
         } else {
             throw cms::UnsupportedOperationException(
                 string("This producer can only send messages to: ") +
@@ -193,27 +202,16 @@ void ActiveMQProducerKernel::send(const 
             throw cms::CMSException("No destination specified", NULL);
         }
 
-        // configure the message
-        message->setCMSDestination(dest);
-        message->setCMSDeliveryMode(deliveryMode);
-        message->setCMSPriority(priority);
-
-        long long expiration = 0LL;
-
-        if (!disableTimestamps) {
-
-            long long timeStamp = System::currentTimeMillis();
-            message->setCMSTimestamp(timeStamp);
-            if (timeToLive > 0LL) {
-                expiration = timeToLive + timeStamp;
+        if (this->memoryUsage.get() != NULL) {
+            try {
+                this->memoryUsage->waitForSpace();
+            } catch (InterruptedException& e) {
+                throw cms::CMSException("Send aborted due to thread interrupt.");
             }
         }
 
-        message->setCMSExpiration(expiration);
-
-        // Delegate send to the session so that it can choose how to
-        // send the message.
-        this->session->send(message, this, this->memoryUsage.get());
+        this->session->send(this, dest, message, deliveryMode, priority, timeToLive,
+                            this->memoryUsage.get(), this->sendTimeout);
     }
     AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
 }

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQProducerKernel.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQProducerKernel.h?rev=1334183&r1=1334182&r2=1334183&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQProducerKernel.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQProducerKernel.h Fri May  4 21:04:52 2012
@@ -25,6 +25,7 @@
 
 #include <activemq/util/Config.h>
 #include <activemq/util/MemoryUsage.h>
+#include <activemq/util/LongSequenceGenerator.h>
 #include <activemq/commands/ProducerInfo.h>
 #include <activemq/commands/ProducerAck.h>
 #include <activemq/exceptions/ActiveMQException.h>
@@ -75,6 +76,9 @@ namespace kernels {
         // The Destination assigned at creation, NULL if not assigned.
         Pointer<cms::Destination> destination;
 
+        // Generator of Message Sequence Id numbers for this producer.
+        util::LongSequenceGenerator messageSequence;
+
     private:
 
         ActiveMQProducerKernel(const ActiveMQProducerKernel&);
@@ -211,8 +215,6 @@ namespace kernels {
             return this->sendTimeout;
         }
 
-    public:
-
         /**
          * @returns true if this Producer has been closed.
          */
@@ -252,6 +254,13 @@ namespace kernels {
          */
         void dispose();
 
+        /**
+         * @returns the next sequence number for a Message sent from this Producer.
+         */
+        long long getNextMessageSequence() {
+            return this->messageSequence.getNextSequenceId();
+        }
+
     private:
 
        // Checks for the closed state and throws if so.

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.cpp?rev=1334183&r1=1334182&r2=1334183&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.cpp Fri May  4 21:04:52 2012
@@ -27,6 +27,7 @@
 #include <activemq/core/ActiveMQSessionExecutor.h>
 #include <activemq/core/PrefetchPolicy.h>
 #include <activemq/util/ActiveMQProperties.h>
+#include <activemq/util/ActiveMQMessageTransformation.h>
 #include <activemq/util/CMSExceptionSupport.h>
 
 #include <activemq/commands/ConsumerInfo.h>
@@ -55,6 +56,7 @@
 #include <decaf/lang/Long.h>
 #include <decaf/lang/Math.h>
 #include <decaf/util/Queue.h>
+#include <decaf/util/concurrent/Mutex.h>
 #include <decaf/util/concurrent/atomic/AtomicBoolean.h>
 #include <decaf/lang/exceptions/InvalidStateException.h>
 #include <decaf/lang/exceptions/NullPointerException.h>
@@ -98,10 +100,12 @@ namespace kernels{
         decaf::util::concurrent::CopyOnWriteArrayList< Pointer<ActiveMQProducerKernel> > producers;
         Pointer<Scheduler> scheduler;
         Pointer<CloseSynhcronization> closeSync;
+        ConsumersMap consumers;
+        Mutex sendMutex;
 
     public:
 
-        SessionConfig() : synchronizationRegistered(false), producers(), scheduler(), closeSync() {}
+        SessionConfig() : synchronizationRegistered(false), producers(), scheduler(), closeSync(), consumers(), sendMutex() {}
         ~SessionConfig() {}
     };
 
@@ -830,66 +834,98 @@ bool ActiveMQSessionKernel::isTransacted
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ActiveMQSessionKernel::send(cms::Message* message, ActiveMQProducerKernel* producer, util::Usage* usage) {
+void ActiveMQSessionKernel::send(kernels::ActiveMQProducerKernel* producer, Pointer<commands::ActiveMQDestination> destination,
+                                 cms::Message* message, int deliveryMode, int priority, long long timeToLive,
+                                 util::MemoryUsage* producerWindow, long long sendTimeout) {
 
     try {
 
         this->checkClosed();
 
-        commands::Message* amqMessage = dynamic_cast< commands::Message* >(message);
-
-        if (amqMessage == NULL) {
-            throw ActiveMQException(__FILE__, __LINE__,
-                "ActiveMQSessionKernel::send - Message is not a valid Open Wire type.");
+        if (destination->isTemporary()) {
+            Pointer<ActiveMQTempDestination> tempDest = destination.dynamicCast<ActiveMQTempDestination>();
+            if (this->connection->isDeleted(tempDest)) {
+                throw cms::InvalidDestinationException(
+                    std::string("Cannot publish to a deleted Destination: ") + destination->toString());
+            }
         }
 
-        // Clear any old data that might be in the message object
-        amqMessage->getMessageId().reset(NULL);
-        amqMessage->getProducerId().reset(NULL);
-        amqMessage->getTransactionId().reset(NULL);
+        synchronized(&this->config->sendMutex) {
 
-        // Always assign the message ID, regardless of the disable
-        // flag.  Not adding a message ID will cause an NPE at the broker.
-        decaf::lang::Pointer<commands::MessageId> id(new commands::MessageId());
-        id->setProducerId(producer->getProducerInfo()->getProducerId());
-        id->setProducerSequenceId(this->getNextProducerSequenceId());
-
-        amqMessage->setMessageId(id);
+            // Ensure that a new transaction is started if this is the first message
+            // sent since the last commit, Broker is notified of a new TX.
+            doStartTransaction();
+
+            Pointer<TransactionId> txId = this->transaction->getTransactionId();
+            Pointer<ProducerInfo> producerInfo = producer->getProducerInfo();
+            Pointer<ProducerId> producerId = producerInfo->getProducerId();
+            long long sequenceId = producer->getNextMessageSequence();
+
+            // Set the "CMS" header fields on the original message, see JMS 1.1 spec section 3.4.11
+            message->setCMSDeliveryMode(deliveryMode);
+            long long expiration = 0LL;
+            if (!producer->getDisableMessageTimeStamp()) {
+                long long timeStamp = System::currentTimeMillis();
+                message->setCMSTimestamp(timeStamp);
+                if (timeToLive > 0) {
+                    expiration = timeToLive + timeStamp;
+                }
+            }
+            message->setCMSExpiration(expiration);
+            message->setCMSPriority(priority);
+            message->setCMSRedelivered(false);
+
+            // transform to our own message format here
+            commands::Message* transformed = NULL;
+            Pointer<commands::Message> amqMessage;
+
+            // Always assign the message ID, regardless of the disable flag.
+            // Not adding a message ID will cause an NPE at the broker.
+            decaf::lang::Pointer<commands::MessageId> id(new commands::MessageId());
+            id->setProducerId(producerId);
+            id->setProducerSequenceId(sequenceId);
+
+            // NOTE:
+            // Now we copy the message before sending, this allows the user to reuse the
+            // message object without interfering with the copy that's being sent.  We
+            // could make this step optional to increase performance but for now we won't.
+            // To not do this implies that the user must never reuse the message object, or
+            // know that the configuration of Transports doesn't involve the message hanging
+            // around beyond the point that send returns.  When the transform step results in
+            // a new Message object being created we can just use that new instance, but when
+            // the original cms::Message pointer was already a commands::Message then we need
+            // to clone it.
+            if (ActiveMQMessageTransformation::transformMessage(message, connection, &transformed)) {
+                amqMessage.reset(transformed);
+                // Sets the Message ID on the original message per spec.
+                message->setCMSMessageID(id->toString());
+            } else {
+                amqMessage.reset(transformed->cloneDataStructure());
+            }
 
-        // Ensure that a new transaction is started if this is the first message
-        // sent since the last commit.
-        doStartTransaction();
-        amqMessage->setTransactionId(this->transaction->getTransactionId());
+            amqMessage->setMessageId(id);
+            amqMessage->getBrokerPath().clear();
+            amqMessage->setTransactionId(txId);
+            amqMessage->setConnection(this->connection);
+
+            // destination format is provider specific so only set on transformed message
+            amqMessage->setDestination(destination);
 
-        // NOTE:
-        // Now we copy the message before sending, this allows the user to reuse the
-        // message object without interfering with the copy that's being sent.  We
-        // could make this step optional to increase performance but for now we won't.
-        // To not do this implies that the user must never reuse the message object, or
-        // know that the configuration of Transports doesn't involve the message hanging
-        // around beyond the point that send returns.
-        Pointer<commands::Message> msgCopy(amqMessage->cloneDataStructure());
+            amqMessage->onSend();
+            amqMessage->setProducerId(producerId);
 
-        msgCopy->onSend();
-        msgCopy->setProducerId( producer->getProducerInfo()->getProducerId() );
+            if (sendTimeout <= 0 && !amqMessage->isResponseRequired() && !this->connection->isAlwaysSyncSend() &&
+                (!amqMessage->isPersistent() || this->connection->isUseAsyncSend() || amqMessage->getTransactionId() != NULL)) {
 
-        if (this->connection->getSendTimeout() <= 0 &&
-            !msgCopy->isResponseRequired() &&
-            !this->connection->isAlwaysSyncSend() &&
-            (!msgCopy->isPersistent() || this->connection->isUseAsyncSend() ||
-               msgCopy->getTransactionId() != NULL)) {
+                if (producerWindow != NULL) {
+                    producerWindow->enqueueUsage(amqMessage->getSize());
+                }
 
-            if (usage != NULL) {
-                usage->enqueueUsage(msgCopy->getSize());
+                // No Response Required, send is asynchronous.
+                this->connection->oneway(amqMessage);
+            } else {
+                this->connection->syncRequest(amqMessage, (unsigned int)sendTimeout);
             }
-
-            // No Response Required.
-            this->connection->oneway(msgCopy);
-
-        } else {
-
-            // Send the message to the broker.
-            this->connection->syncRequest(msgCopy, this->connection->getSendTimeout());
         }
     }
     AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
@@ -998,6 +1034,7 @@ void ActiveMQSessionKernel::createTempor
 
         // Now that its setup, link it to this Connection so it can be closed.
         tempDestination->setConnection(this->connection);
+        this->connection->addTempDestination(Pointer<ActiveMQTempDestination>(tempDestination->cloneDataStructure()));
     }
     AMQ_CATCH_RETHROW( activemq::exceptions::ActiveMQException )
     AMQ_CATCH_EXCEPTION_CONVERT( Exception, activemq::exceptions::ActiveMQException )
@@ -1005,6 +1042,23 @@ void ActiveMQSessionKernel::createTempor
 }
 
 ////////////////////////////////////////////////////////////////////////////////
+bool ActiveMQSessionKernel::isInUse(Pointer<ActiveMQDestination> destination) {
+
+    synchronized(&this->consumers) {
+        std::vector< Pointer<ActiveMQConsumerKernel> > consumers = this->consumers.values();
+
+        std::vector< Pointer<ActiveMQConsumerKernel> >::iterator iter = consumers.begin();
+        for (; iter != consumers.end(); ++iter) {
+            if ((*iter)->isInUse(destination)) {
+                return true;
+            }
+        }
+    }
+
+    return false;
+}
+
+////////////////////////////////////////////////////////////////////////////////
 void ActiveMQSessionKernel::destroyTemporaryDestination(
     commands::ActiveMQTempDestination* tempDestination) {
 
@@ -1040,7 +1094,6 @@ std::string ActiveMQSessionKernel::creat
 void ActiveMQSessionKernel::oneway(Pointer<Command> command) {
 
     try {
-        this->checkClosed();
         this->connection->oneway(command);
     }
     AMQ_CATCH_RETHROW( activemq::exceptions::ActiveMQException )
@@ -1092,8 +1145,6 @@ void ActiveMQSessionKernel::removeConsum
 
     try {
 
-        this->checkClosed();
-
         synchronized(&this->consumers) {
             if (this->consumers.containsKey(consumerId)) {
                 // Remove this Id both from the Sessions Map of Consumers and from the Connection.
@@ -1125,7 +1176,6 @@ void ActiveMQSessionKernel::addProducer(
 void ActiveMQSessionKernel::removeProducer(Pointer<ActiveMQProducerKernel> producer) {
 
     try {
-        this->checkClosed();
         this->connection->removeProducer(producer->getProducerId());
         this->config->producers.remove(producer);
     }

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.h?rev=1334183&r1=1334182&r2=1334183&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.h Fri May  4 21:04:52 2012
@@ -255,7 +255,7 @@ namespace kernels {
 
         virtual void unsubscribe(const std::string& name);
 
-   public:   // ActiveMQSession specific Methods
+   public:   // ActiveMQSessionKernel specific Methods
 
         /**
          * Sends a message from the Producer specified using this session's connection
@@ -264,24 +264,37 @@ namespace kernels {
          * <p>
          * Asynchronous sends will be chosen if at all possible.
          *
-         * @param message
-         *        The message to send to the broker.
          * @param producer
-         *        The sending Producer
+         *      The sending Producer
+         * @param destination
+         *      The target destination for the Message.
+         * @param message
+         *      The message to send to the broker.
+         * @param deliveryMode
+         *      The delivery mode to assign to the outgoing message.
+         * @param priority
+         *      The priority value to assign to the outgoing message.
+         * @param timeToLive
+         *      The time to live for the outgoing message.
          * @param usage
-         *        Pointer to a Usage tracker which if set will be increased by the size
-         *        of the given message.
-         *
-         * @throws CMSException
-         */
-        void send(cms::Message* message, kernels::ActiveMQProducerKernel* producer, util::Usage* usage);
+         *      Pointer to a Usage tracker which if set will be increased by the size
+         *      of the given message.
+         * @param sendTimeout
+         *      The amount of time to block during send before failing, or 0 to wait forever.
+         *
+         * @throws CMSException if an error occurs while sending the message.
+         */
+        void send(kernels::ActiveMQProducerKernel* producer, Pointer<commands::ActiveMQDestination> destination,
+                  cms::Message* message, int deliveryMode, int priority, long long timeToLive,
+                  util::MemoryUsage* producerWindow, long long sendTimeout);
 
         /**
          * This method gets any registered exception listener of this sessions
          * connection and returns it.  Mainly intended for use by the objects
          * that this session creates so that they can notify the client of
          * exceptions that occur in the context of another thread.
-         * @returns cms::ExceptionListener pointer or NULL
+         *
+         * @returns the registered cms::ExceptionListener pointer or NULL
          */
         cms::ExceptionListener* getExceptionListener();
 
@@ -497,6 +510,13 @@ namespace kernels {
          */
         void close(Pointer<commands::ConsumerId> id);
 
+        /**
+         * Checks if the given destination is currently in use by any consumers in this Session.
+         *
+         * @return true if there is a consumer of this destination in this Session.
+         */
+        bool isInUse(Pointer<commands::ActiveMQDestination> destination);
+
    private:
 
        /**

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/ConnectionState.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/ConnectionState.cpp?rev=1334183&r1=1334182&r2=1334183&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/ConnectionState.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/state/ConnectionState.cpp Fri May  4 21:04:52 2012
@@ -19,8 +19,12 @@
 
 #include <decaf/lang/exceptions/IllegalStateException.h>
 
+#include <activemq/commands/SessionId.h>
+#include <activemq/commands/SessionInfo.h>
+
 using namespace activemq;
 using namespace activemq::state;
+using namespace activemq::commands;
 
 ////////////////////////////////////////////////////////////////////////////////
 ConnectionState::ConnectionState( const Pointer<ConnectionInfo>& info ) :
@@ -31,6 +35,13 @@ ConnectionState::ConnectionState( const 
     disposed(false),
     connectionInterruptProcessingComplete(true),
     recoveringPullConsumers() {
+
+    Pointer<SessionId> sessionId(new SessionId(info->getConnectionId().get(), -1));
+    Pointer<SessionInfo> session(new SessionInfo());
+    session->setSessionId(sessionId);
+
+    // Add the default session id.
+    addSession(session);
 }
 
 ////////////////////////////////////////////////////////////////////////////////

Added: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/util/ActiveMQMessageTransformation.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/util/ActiveMQMessageTransformation.cpp?rev=1334183&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/util/ActiveMQMessageTransformation.cpp (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/util/ActiveMQMessageTransformation.cpp Fri May  4 21:04:52 2012
@@ -0,0 +1,193 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "ActiveMQMessageTransformation.h"
+
+#include <cms/Topic.h>
+#include <cms/TemporaryTopic.h>
+#include <cms/Queue.h>
+#include <cms/TemporaryQueue.h>
+#include <cms/Message.h>
+#include <cms/BytesMessage.h>
+#include <cms/MapMessage.h>
+#include <cms/StreamMessage.h>
+#include <cms/ObjectMessage.h>
+#include <activemq/core/ActiveMQConnection.h>
+#include <activemq/commands/ActiveMQDestination.h>
+#include <activemq/commands/ActiveMQQueue.h>
+#include <activemq/commands/ActiveMQTopic.h>
+#include <activemq/commands/ActiveMQTempQueue.h>
+#include <activemq/commands/ActiveMQTempTopic.h>
+#include <activemq/commands/ActiveMQMessage.h>
+#include <activemq/commands/ActiveMQMapMessage.h>
+#include <activemq/commands/ActiveMQTextMessage.h>
+#include <activemq/commands/ActiveMQStreamMessage.h>
+#include <activemq/commands/ActiveMQBlobMessage.h>
+#include <activemq/commands/ActiveMQBytesMessage.h>
+#include <activemq/commands/ActiveMQObjectMessage.h>
+#include <activemq/commands/Message.h>
+
+using namespace activemq;
+using namespace activemq::util;
+using namespace activemq::core;
+using namespace activemq::commands;
+
+////////////////////////////////////////////////////////////////////////////////
+ActiveMQMessageTransformation::ActiveMQMessageTransformation() {
+}
+
+////////////////////////////////////////////////////////////////////////////////
+ActiveMQMessageTransformation::~ActiveMQMessageTransformation() {
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool ActiveMQMessageTransformation::transformDestination(const cms::Destination* destination, const ActiveMQDestination** amqDestination) {
+
+    if (amqDestination == NULL) {
+        throw cms::CMSException("Provided target ActiveMQMessage pointer was NULL");
+    }
+
+    *amqDestination = dynamic_cast<const ActiveMQDestination*>(destination);
+
+    if (*amqDestination == NULL) {
+
+        if (dynamic_cast<const cms::TemporaryQueue*>(destination) != NULL) {
+            *amqDestination = new ActiveMQTempQueue(
+                dynamic_cast<const cms::TemporaryQueue*>(destination)->getQueueName());
+        } else if (dynamic_cast<const cms::TemporaryTopic*>(destination) != NULL) {
+            *amqDestination = new ActiveMQTempTopic(
+                dynamic_cast<const cms::TemporaryTopic*>(destination)->getTopicName());
+        } else if (dynamic_cast<const cms::Queue*>(destination) != NULL) {
+            *amqDestination = new ActiveMQQueue(
+                dynamic_cast<const cms::Queue*>(destination)->getQueueName());
+        } else if (dynamic_cast<const cms::Topic*>(destination) != NULL) {
+            *amqDestination = new ActiveMQTopic(
+                dynamic_cast<const cms::Topic*>(destination)->getTopicName());
+        }
+    } else {
+        return false;
+    }
+
+    return true;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool ActiveMQMessageTransformation::transformMessage(cms::Message* message, ActiveMQConnection* connection, Message** amqMessage) {
+
+    if (amqMessage == NULL) {
+        throw cms::CMSException("Provided target ActiveMQMessage pointer was NULL");
+    }
+
+    *amqMessage = dynamic_cast<Message*>(message);
+
+    if (*amqMessage != NULL) {
+        return false;
+    } else {
+
+        if (dynamic_cast<cms::BytesMessage*>(message) != NULL) {
+            cms::BytesMessage* bytesMsg = dynamic_cast<cms::BytesMessage*>(message);
+            bytesMsg->reset();
+            ActiveMQBytesMessage* msg = new ActiveMQBytesMessage();
+            msg->setConnection(connection);
+            try {
+                for (;;) {
+                    // Reads a byte from the message stream until the stream is empty
+                    msg->writeByte(bytesMsg->readByte());
+                }
+            } catch (cms::MessageEOFException& e) {
+                // if an end of message stream as expected
+            } catch (cms::CMSException& e) {
+            }
+
+//            *amqMessage = msg;
+        } else if (dynamic_cast<cms::MapMessage*>(message) != NULL) {
+//            cms::MapMessage* mapMsg = dynamic_cast<cms::MapMessage*>(message);
+//            ActiveMQMapMessage* msg = new ActiveMQMapMessage();
+//            msg->setConnection(connection);
+            // TODO Need type infos for map elements
+//            Enumeration iter = mapMsg->getMapNames();
+//
+//            while (iter.hasMoreElements()) {
+//                String name = iter.nextElement().toString();
+//                msg.setObject(name, mapMsg.getObject(name));
+//            }
+//
+//            *amqMessage = msg;
+        } else if (dynamic_cast<cms::ObjectMessage*>(message) != NULL) {
+//            cms::ObjectMessage* objMsg = dynamic_cast<cms::ObjectMessage*>(message);
+            ActiveMQObjectMessage* msg = new ActiveMQObjectMessage();
+            msg->setConnection(connection);
+//            *amqMessage = msg;
+        } else if (dynamic_cast<cms::StreamMessage*>(message) != NULL) {
+//            cms::StreamMessage* streamMessage = dynamic_cast<cms::StreamMessage*>(message);
+//            streamMessage->reset();
+//            ActiveMQStreamMessage* msg = new ActiveMQStreamMessage();
+//            msg->setConnection(connection);
+//            Object obj = NULL;
+
+            // TODO Need element enumeration for StreamMessage
+//            try {
+//                while ((obj = streamMessage->readObject()) != NULL) {
+//                    msg->writeObject(obj);
+//                }
+//            } catch (MessageEOFException e) {
+//                // if an end of message stream as expected
+//            } catch (JMSException e) {
+//            }
+
+//            *amqMessage = msg;
+        } else if (dynamic_cast<cms::TextMessage*>(message) != NULL) {
+            cms::TextMessage* textMsg = dynamic_cast<cms::TextMessage*>(message);
+            ActiveMQTextMessage* msg = new ActiveMQTextMessage();
+            msg->setConnection(connection);
+            msg->setText(textMsg->getText());
+//            *amqMessage = msg;
+        } else {
+            *amqMessage = new ActiveMQMessage();
+            (*amqMessage)->setConnection(connection);
+        }
+
+        ActiveMQMessageTransformation::copyProperties(message, dynamic_cast<cms::Message*>(*amqMessage));
+    }
+
+    return true;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQMessageTransformation::copyProperties(const cms::Message* fromMessage, cms::Message* toMessage) {
+
+    toMessage->setCMSMessageID(fromMessage->getCMSMessageID());
+    toMessage->setCMSCorrelationID(fromMessage->getCMSCorrelationID());
+//    toMessage->setCMSReplyTo(transformDestination(fromMessage->getCMSReplyTo()));
+//    toMessage->setCMSDestination(transformDestination(fromMessage->getCMSDestination()));
+//    toMessage->setCMSDeliveryMode(fromMessage->getCMSDeliveryMode());
+//    toMessage->setCMSRedelivered(fromMessage->getCMSRedelivered());
+//    toMessage->setCMSType(fromMessage->getCMSType());
+//    toMessage->setCMSExpiration(fromMessage->getCMSExpiration());
+//    toMessage->setCMSPriority(fromMessage->getCMSPriority());
+//    toMessage->setCMSTimestamp(fromMessage->getCMSTimestamp());
+//
+//    std::vector<std::string> propertyNames = fromMessage->getPropertyNames();
+//
+//    std::vector<std::string>::const_iterator iter = propertyNames.begin();
+//    for(; iter != propertyNames.end(); ++iter) {
+//        std::string name = *iter;
+        // TODO Need type values for properties
+//        Object obj = fromMessage.getObjectProperty(name);
+//        toMessage->setObjectProperty(name, obj);
+//    }
+}

Propchange: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/util/ActiveMQMessageTransformation.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/util/ActiveMQMessageTransformation.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/util/ActiveMQMessageTransformation.h?rev=1334183&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/util/ActiveMQMessageTransformation.h (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/util/ActiveMQMessageTransformation.h Fri May  4 21:04:52 2012
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _ACTIVEMQ_UTIL_ACTIVEMQMESSAGETRANSFORMATION_H_
+#define _ACTIVEMQ_UTIL_ACTIVEMQMESSAGETRANSFORMATION_H_
+
+#include <cms/Message.h>
+#include <cms/Destination.h>
+
+#include <activemq/util/Config.h>
+
+#include <string>
+
+namespace activemq {
+namespace core {
+    class ActiveMQConnection;
+}
+namespace commands {
+    class Message;
+    class ActiveMQDestination;
+}
+namespace util {
+
+    class AMQCPP_API ActiveMQMessageTransformation {
+    private:
+
+        ActiveMQMessageTransformation();
+
+    public:
+
+        virtual ~ActiveMQMessageTransformation();
+
+        /**
+         * Creates a fast shallow copy of the current ActiveMQDestination or creates a whole new
+         * destination instance from an available CMS destination from another provider.
+         *
+         * This method will return true if the passed CMS Destination was cloned and a new Destination
+         * object created or false if the input Destination was already an ActiveMQ destination.  The
+         * should use the return value as a hint to determine if it needs to delete the amqDestinatio
+         * object or not.
+         *
+         * @param destination
+         *      Destination to be converted into ActiveMQ's implementation.
+         * @param amqDestination
+         *      Pointer to a pointer where the casted or cloned AMQ destination is stored.
+         *
+         * @return true if the amqDestination is a new instance and not just a cast of the input destination.
+         *
+         * @throws CMSException if an error occurs
+         */
+        static bool transformDestination(const cms::Destination* destination, const commands::ActiveMQDestination** amqDestination);
+
+        /**
+         * Creates a fast shallow copy of the current ActiveMQMessage or creates a whole new
+         * message instance from an available CMS message from another provider.
+         *
+         * This method will return true if the passed CMS Message was cloned and a new ActiveMQMessage
+         * object created or false if the input Message was already an ActiveMQMessage instance.  The
+         * caller should use the return value as a hint to determine if it needs to delete the resulting
+         * ActiveMQMessage object or not.
+         *
+         * @param message
+         *      CMS Message to be converted into ActiveMQ's implementation.
+         * @param amqMessage
+         *      Pointer to a pointer where the casted or cloned AMQ message is stored.
+         *
+         * @return true if the amqMessage is a new instance and not just a cast of the input message.
+         *
+         * @throws CMSException if an error occurs
+         */
+        static bool transformMessage(cms::Message* message, core::ActiveMQConnection* connection, commands::Message** amqMessage);
+
+        /**
+         * Copies the standard CMS and user defined properties from the given message to the
+         * specified message.
+         *
+         * @param fromMessage
+         *      The message to take the properties from.
+         * @param toMessage
+         *      The message to add the properties to.
+         *
+         * @throws CMSException if an error occurs during the copy.
+         */
+        static void copyProperties(const cms::Message* fromMessage, cms::Message* toMessage);
+
+    };
+
+}}
+
+#endif /* _ACTIVEMQ_UTIL_ACTIVEMQMESSAGETRANSFORMATION_H_ */

Propchange: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/util/ActiveMQMessageTransformation.h
------------------------------------------------------------------------------
    svn:eol-style = native



Mime
View raw message