activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r1463311 [1/2] - in /activemq/activemq-cpp/trunk/activemq-cpp/src: main/activemq/core/ main/activemq/core/kernels/ test/activemq/core/
Date Mon, 01 Apr 2013 21:56:01 GMT
Author: tabish
Date: Mon Apr  1 21:56:01 2013
New Revision: 1463311

URL: http://svn.apache.org/r1463311
Log:
https://issues.apache.org/jira/browse/AMQCPP-473
https://issues.apache.org/jira/browse/AMQCPP-472
https://issues.apache.org/jira/browse/AMQCPP-471

Modified:
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnectionFactory.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnectionFactory.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ConnectionAudit.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ConnectionAudit.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/Dispatcher.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ConnectionAuditTest.cpp

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp?rev=1463311&r1=1463310&r2=1463311&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp Mon Apr  1 21:56:01 2013
@@ -22,6 +22,7 @@
 #include <activemq/core/ActiveMQSession.h>
 #include <activemq/core/ActiveMQConstants.h>
 #include <activemq/core/ActiveMQConnectionMetaData.h>
+#include <activemq/core/ActiveMQMessageAudit.h>
 #include <activemq/core/AdvisoryConsumer.h>
 #include <activemq/core/ConnectionAudit.h>
 #include <activemq/core/kernels/ActiveMQSessionKernel.h>
@@ -164,13 +165,25 @@ namespace core{
         bool dispatchAsync;
         bool alwaysSyncSend;
         bool useAsyncSend;
+        bool sendAcksAsync;
         bool messagePrioritySupported;
         bool watchTopicAdvisories;
         bool useCompression;
+        bool useRetroactiveConsumer;
+        bool checkForDuplicates;
+        bool optimizeAcknowledge;
+        bool exclusiveConsumer;
+        bool transactedIndividualAck;
+        bool nonBlockingRedelivery;
         int compressionLevel;
         unsigned int sendTimeout;
         unsigned int closeTimeout;
         unsigned int producerWindowSize;
+        int auditDepth;
+        int auditMaximumProducerNumber;
+        long long optimizeAcknowledgeTimeOut;
+        long long optimizedAckScheduledAckInterval;
+        long long consumerFailoverRedeliveryWaitPeriod;
 
         std::auto_ptr<PrefetchPolicy> defaultPrefetchPolicy;
         std::auto_ptr<RedeliveryPolicy> defaultRedeliveryPolicy;
@@ -219,13 +232,25 @@ namespace core{
                              dispatchAsync(true),
                              alwaysSyncSend(false),
                              useAsyncSend(false),
+                             sendAcksAsync(true),
                              messagePrioritySupported(true),
                              watchTopicAdvisories(true),
                              useCompression(false),
+                             useRetroactiveConsumer(false),
+                             checkForDuplicates(true),
+                             optimizeAcknowledge(false),
+                             exclusiveConsumer(false),
+                             transactedIndividualAck(false),
+                             nonBlockingRedelivery(false),
                              compressionLevel(-1),
                              sendTimeout(0),
                              closeTimeout(15000),
                              producerWindowSize(0),
+                             auditDepth(ActiveMQMessageAudit::DEFAULT_WINDOW_SIZE),
+                             auditMaximumProducerNumber(ActiveMQMessageAudit::MAXIMUM_PRODUCER_COUNT),
+                             optimizeAcknowledgeTimeOut(300),
+                             optimizedAckScheduledAckInterval(0),
+                             consumerFailoverRedeliveryWaitPeriod(0),
                              defaultPrefetchPolicy(NULL),
                              defaultRedeliveryPolicy(NULL),
                              exceptionListener(NULL),
@@ -560,6 +585,7 @@ void ActiveMQConnection::removeSession(P
         this->config->sessionsLock.writeLock().lock();
         try {
             this->config->activeSessions.remove(session);
+            this->config->connectionAudit.removeDispatcher(session.get());
             this->config->sessionsLock.writeLock().unlock();
         } catch (Exception& ex) {
             this->config->sessionsLock.writeLock().unlock();
@@ -1668,6 +1694,126 @@ void ActiveMQConnection::setWatchTopicAd
 }
 
 ////////////////////////////////////////////////////////////////////////////////
+int ActiveMQConnection::getAuditDepth() const {
+    return this->config->auditDepth;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConnection::setAuditDepth(int auditDepth) {
+    this->config->auditDepth = auditDepth;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+int ActiveMQConnection::getAuditMaximumProducerNumber() const {
+    return this->config->auditMaximumProducerNumber;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConnection::setAuditMaximumProducerNumber(int auditMaximumProducerNumber) {
+    this->config->auditMaximumProducerNumber = auditMaximumProducerNumber;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool ActiveMQConnection::isCheckForDuplicates() const {
+    return this->config->checkForDuplicates;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConnection::setCheckForDuplicates(bool checkForDuplicates) {
+    this->config->checkForDuplicates = checkForDuplicates;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool ActiveMQConnection::isSendAcksAsync() const {
+    return this->config->sendAcksAsync;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConnection::setSendAcksAsync(bool sendAcksAsync) {
+    this->config->sendAcksAsync = sendAcksAsync;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool ActiveMQConnection::isTransactedIndividualAck() const {
+    return this->config->transactedIndividualAck;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConnection::setTransactedIndividualAck(bool transactedIndividualAck) {
+    this->config->transactedIndividualAck = transactedIndividualAck;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool ActiveMQConnection::isNonBlockingRedelivery() const {
+    return this->config->nonBlockingRedelivery;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConnection::setNonBlockingRedelivery(bool nonBlockingRedelivery) {
+    this->config->nonBlockingRedelivery = nonBlockingRedelivery;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool ActiveMQConnection::isOptimizeAcknowledge() const {
+    return this->config->optimizeAcknowledge;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConnection::setOptimizeAcknowledge(bool optimizeAcknowledge) {
+    this->config->optimizeAcknowledge = optimizeAcknowledge;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+long long ActiveMQConnection::getOptimizeAcknowledgeTimeOut() const {
+    return this->config->optimizeAcknowledgeTimeOut;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConnection::setOptimizeAcknowledgeTimeOut(long long optimizeAcknowledgeTimeOut) {
+    this->config->optimizeAcknowledgeTimeOut = optimizeAcknowledgeTimeOut;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+long long ActiveMQConnection::getOptimizedAckScheduledAckInterval() const {
+    return this->config->optimizedAckScheduledAckInterval;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConnection::setOptimizedAckScheduledAckInterval(long long optimizedAckScheduledAckInterval) {
+    this->config->optimizedAckScheduledAckInterval = optimizedAckScheduledAckInterval;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+long long ActiveMQConnection::getConsumerFailoverRedeliveryWaitPeriod() const {
+    return this->config->consumerFailoverRedeliveryWaitPeriod;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConnection::setConsumerFailoverRedeliveryWaitPeriod(long long value) {
+    this->config->consumerFailoverRedeliveryWaitPeriod = value;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool ActiveMQConnection::isUseRetroactiveConsumer() const {
+    return this->config->useRetroactiveConsumer;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConnection::setUseRetroactiveConsumer(bool useRetroactiveConsumer) {
+    this->config->useRetroactiveConsumer = useRetroactiveConsumer;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool ActiveMQConnection::isExclusiveConsumer() const {
+    return this->config->exclusiveConsumer;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConnection::setExclusiveConsumer(bool exclusiveConsumer) {
+    this->config->exclusiveConsumer = exclusiveConsumer;
+}
+
+////////////////////////////////////////////////////////////////////////////////
 void ActiveMQConnection::addTempDestination(Pointer<ActiveMQTempDestination> destination) {
     this->config->activeTempDestinations.put(destination, destination);
 }
@@ -1759,3 +1905,18 @@ bool ActiveMQConnection::isDeleted(Point
 
     return !this->config->activeTempDestinations.containsKey(destination);
 }
+
+////////////////////////////////////////////////////////////////////////////////
+bool ActiveMQConnection::isDuplicate(Dispatcher* dispatcher, Pointer<commands::Message> message) {
+
+    if (this->config->checkForDuplicates) {
+        return this->config->connectionAudit.isDuplicate(dispatcher, message);
+    }
+
+    return false;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConnection::rollbackDuplicate(Dispatcher* dispatcher, Pointer<commands::Message> message) {
+    this->config->connectionAudit.rollbackDuplicate(dispatcher, message);
+}

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h?rev=1463311&r1=1463310&r2=1463311&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h Mon Apr  1 21:56:01 2013
@@ -238,6 +238,28 @@ namespace core{
          */
         virtual void destroyDestination(const cms::Destination* destination);
 
+        /**
+         * Allows Consumers to check if an incoming Message is a Duplicate.
+         *
+         * @param dispatcher
+         *      The Dispatcher that is checking the Message for Duplication.
+         * @param message
+         *      The Message that should be checked.
+         *
+         * @returns true if the Message was seen before.
+         */
+        bool isDuplicate(Dispatcher* dispatcher, Pointer<commands::Message> message);
+
+        /**
+         * Mark message as received.
+         *
+         * @param dispatcher
+         *      The Dispatcher instance that has received the Message.
+         * @param message
+         *      The Message that has been received.
+         */
+        void rollbackDuplicate(Dispatcher* dispatcher, Pointer<commands::Message> message);
+
     public:   // Connection Interface Methods
 
         /**
@@ -543,6 +565,222 @@ namespace core{
          */
         void setWatchTopicAdvisories(bool value);
 
+        /**
+         * Get the audit depth for Messages for consumers when using a fault
+         * tolerant transport.  The higher the value the more messages are checked
+         * for duplication, and the larger the performance impact of duplicate
+         * detection will be.
+         *
+         * @returns the configured audit depth.
+         */
+        int getAuditDepth() const;
+
+        /**
+         * Set the audit depth for Messages for consumers when using a fault
+         * tolerant transport.  The higher the value the more messages are checked
+         * for duplication, and the larger the performance impact of duplicate
+         * detection will be.
+         *
+         * @param auditDepth
+         *      The configured audit depth.
+         */
+        void setAuditDepth(int auditDepth);
+
+        /**
+         * The number of Producers that will be audited.
+         *
+         * @returns the configured number of producers to include in the audit.
+         */
+        int getAuditMaximumProducerNumber() const;
+
+        /**
+         * The number of Producers that will be audited.
+         *
+         * @param auditMaximumProducerNumber
+         *      The configured number of producers to include in the audit.
+         */
+        void setAuditMaximumProducerNumber(int auditMaximumProducerNumber);
+
+        /**
+         * Gets the value of the configured Duplicate Message detection feature.
+         *
+         * When enabled and a fault tolerant transport is used (think failover) then
+         * this feature will help to detect and filter duplicate messages that might
+         * otherwise be delivered to a consumer after a connection failure.
+         *
+         * Disabling this can increase performance since no Message auditing will
+         * occur.
+         *
+         * @return the checkForDuplicates value currently set.
+         */
+        bool isCheckForDuplicates() const;
+
+        /**
+         * Gets the value of the configured Duplicate Message detection feature.
+         *
+         * When enabled and a fault tolerant transport is used (think failover) then
+         * this feature will help to detect and filter duplicate messages that might
+         * otherwise be delivered to a consumer after a connection failure.
+         *
+         * Disabling this can increase performance since no Message auditing will
+         * occur.
+         *
+         * @param checkForDuplicates
+         *      The checkForDuplicates value to be configured.
+         */
+        void setCheckForDuplicates(bool checkForDuplicates);
+
+        /**
+         * when true, submit individual transacted acks immediately rather than with transaction
+         * completion.  This allows the acks to represent delivery status which can be persisted on
+         * rollback Used in conjunction with KahaDB set to Rewrite On Redelivery.
+         *
+         * @returns true if this option is enabled.
+         */
+        bool isTransactedIndividualAck() const;
+
+        /**
+         * when true, submit individual transacted acks immediately rather than with transaction
+         * completion.  This allows the acks to represent delivery status which can be persisted on
+         * rollback Used in conjunction with KahaDB set to Rewrite On Redelivery.
+         *
+         * @param transactedIndividualAck
+         *      The value to set.
+         */
+        void setTransactedIndividualAck(bool transactedIndividualAck);
+
+        /**
+         * Returns true if non-blocking redelivery of Messages is configured for Consumers
+         * that are rolled back or recovered.
+         *
+         * @return true if non-blocking redelivery is enabled.
+         */
+        bool isNonBlockingRedelivery() const;
+
+        /**
+         * When true a MessageConsumer will not stop Message delivery before re-delivering Messages
+         * from a rolled back transaction.  This implies that message order will not be preserved and
+         * also will result in the TransactedIndividualAck option to be enabled.
+         *
+         * @param nonBlockingRedelivery
+         *      The value to configure for non-blocking redelivery.
+         */
+        void setNonBlockingRedelivery(bool nonBlockingRedelivery);
+
+        /**
+         * Gets the delay period for a consumer redelivery.
+         *
+         * @returns configured time delay in milliseconds.
+         */
+        long long getConsumerFailoverRedeliveryWaitPeriod() const;
+
+        /**
+         * Sets the delay period for a consumer redelivery.
+         *
+         * @param value
+         *      The configured time delay in milliseconds.
+         */
+        void setConsumerFailoverRedeliveryWaitPeriod(long long value);
+
+        /**
+         * @return true if optimizeAcknowledge is enabled.
+         */
+        bool isOptimizeAcknowledge() const;
+
+        /**
+         * Sets if Consumers are configured to use Optimized Acknowledge by default.
+         *
+         * @param optimizeAcknowledge
+         *      The optimizeAcknowledge mode to set.
+         */
+        void setOptimizeAcknowledge(bool optimizeAcknowledge);
+
+        /**
+         * Gets the time between optimized ack batches in milliseconds.
+         *
+         * @returns time between optimized ack batches in Milliseconds.
+         */
+        long long getOptimizeAcknowledgeTimeOut() const;
+
+        /**
+         * The max time in milliseconds between optimized ack batches.
+         *
+         * @param optimizeAcknowledgeTimeOut
+         *      The time in milliseconds for optimized ack batches.
+         */
+        void setOptimizeAcknowledgeTimeOut(long long optimizeAcknowledgeTimeOut);
+
+        /**
+         * Gets the configured time interval that is used to force all MessageConsumers that have
+         * optimizedAcknowledge enabled to send an ack for any outstanding Message Acks.  By default
+         * this value is set to zero meaning that the consumers will not do any background Message
+         * acknowledgment.
+         *
+         * @return the scheduledOptimizedAckInterval
+         */
+        long long getOptimizedAckScheduledAckInterval() const;
+
+        /**
+         * Sets the amount of time between scheduled sends of any outstanding Message Acks for
+         * consumers that have been configured with optimizeAcknowledge enabled.
+         *
+         * Time is given in Milliseconds.
+         *
+         * @param optimizedAckScheduledAckInterval
+         *      The scheduledOptimizedAckInterval to use for new Consumers.
+         */
+        void setOptimizedAckScheduledAckInterval(long long optimizedAckScheduledAckInterval);
+
+        /**
+         * Should all created consumers be retroactive.
+         *
+         * @returns true if consumer will be created with the retroactive flag set.
+         */
+        bool isUseRetroactiveConsumer() const;
+
+        /**
+         * Sets whether or not retroactive consumers are enabled. Retroactive
+         * consumers allow non-durable topic subscribers to receive old messages
+         * that were published before the non-durable subscriber started.
+         *
+         * @param useRetroactiveConsumer
+         *      The value of this configuration option.
+         */
+        void setUseRetroactiveConsumer(bool useRetroactiveConsumer);
+
+        /**
+         * Should all created consumers be exclusive.
+         *
+         * @returns true if consumer will be created with the exclusive flag set.
+         */
+        bool isExclusiveConsumer() const;
+
+        /**
+         * Enables or disables whether or not queue consumers should be exclusive or
+         * not for example to preserve ordering when not using Message Groups.
+         *
+         * @param exclusiveConsumer
+         *      The value of this configuration option.
+         */
+        void setExclusiveConsumer(bool exclusiveConsumer);
+
+        /**
+         * Returns whether Message acknowledgments are sent asynchronously meaning no
+         * response is required from the broker before the ack completes.
+         *
+         * @return the sendAcksAsync configured value.
+         */
+        bool isSendAcksAsync() const;
+
+        /**
+         * Sets whether Message acknowledgments are sent asynchronously meaning no
+         * response is required from the broker before the ack completes.
+         *
+         * @param sendAcksAsync
+         *      The sendAcksAsync configuration value to set.
+         */
+        void setSendAcksAsync(bool sendAcksAsync);
+
     public: // TransportListener
 
         /**

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnectionFactory.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnectionFactory.cpp?rev=1463311&r1=1463310&r2=1463311&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnectionFactory.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnectionFactory.cpp Mon Apr  1 21:56:01 2013
@@ -22,6 +22,7 @@
 #include <decaf/util/concurrent/Mutex.h>
 #include <decaf/lang/Boolean.h>
 #include <decaf/lang/Integer.h>
+#include <decaf/lang/Long.h>
 #include <decaf/lang/Pointer.h>
 #include <decaf/lang/Math.h>
 #include <decaf/lang/exceptions/NullPointerException.h>
@@ -29,6 +30,7 @@
 #include <activemq/transport/TransportRegistry.h>
 #include <activemq/core/ActiveMQConnection.h>
 #include <activemq/core/ActiveMQConstants.h>
+#include <activemq/core/ActiveMQMessageAudit.h>
 #include <activemq/core/policies/DefaultPrefetchPolicy.h>
 #include <activemq/core/policies/DefaultRedeliveryPolicy.h>
 #include <activemq/util/URISupport.h>
@@ -77,13 +79,25 @@ namespace core{
         bool dispatchAsync;
         bool alwaysSyncSend;
         bool useAsyncSend;
+        bool sendAcksAsync;
         bool messagePrioritySupported;
         bool useCompression;
+        bool useRetroactiveConsumer;
         bool watchTopicAdvisories;
+        bool checkForDuplicates;
+        bool optimizeAcknowledge;
+        bool exclusiveConsumer;
+        bool transactedIndividualAck;
+        bool nonBlockingRedelivery;
         int compressionLevel;
         unsigned int sendTimeout;
         unsigned int closeTimeout;
         unsigned int producerWindowSize;
+        int auditDepth;
+        int auditMaximumProducerNumber;
+        long long optimizeAcknowledgeTimeOut;
+        long long optimizedAckScheduledAckInterval;
+        long long consumerFailoverRedeliveryWaitPeriod;
 
         cms::ExceptionListener* defaultListener;
         cms::MessageTransformer* defaultTransformer;
@@ -99,13 +113,25 @@ namespace core{
                             dispatchAsync(true),
                             alwaysSyncSend(false),
                             useAsyncSend(false),
+                            sendAcksAsync(true),
                             messagePrioritySupported(true),
                             useCompression(false),
+                            useRetroactiveConsumer(false),
                             watchTopicAdvisories(true),
+                            checkForDuplicates(true),
+                            optimizeAcknowledge(false),
+                            exclusiveConsumer(false),
+                            transactedIndividualAck(false),
+                            nonBlockingRedelivery(false),
                             compressionLevel(-1),
                             sendTimeout(0),
                             closeTimeout(15000),
                             producerWindowSize(0),
+                            auditDepth(ActiveMQMessageAudit::DEFAULT_WINDOW_SIZE),
+                            auditMaximumProducerNumber(ActiveMQMessageAudit::MAXIMUM_PRODUCER_COUNT),
+                            optimizeAcknowledgeTimeOut(300),
+                            optimizedAckScheduledAckInterval(0),
+                            consumerFailoverRedeliveryWaitPeriod(0),
                             defaultListener(NULL),
                             defaultTransformer(NULL),
                             defaultPrefetchPolicy(new DefaultPrefetchPolicy()),
@@ -118,15 +144,12 @@ namespace core{
             this->properties->clear();
 
             if (uri.getQuery() != "") {
-
                 // Not a composite URI so this works fine.
                 try {
                     URISupport::parseQuery(uri.getQuery(), properties.get());
                 } catch (URISyntaxException& ex) {
                 }
-
             } else {
-
                 // Composite URI won't indicate it has a query even if it does.
                 try {
                     CompositeData composite = URISupport::parseComposite(uri);
@@ -139,45 +162,56 @@ namespace core{
             this->alwaysSyncSend = Boolean::parseBoolean(
                 properties->getProperty(core::ActiveMQConstants::toString(
                     core::ActiveMQConstants::CONNECTION_ALWAYSSYNCSEND), "false"));
-
             this->useAsyncSend = Boolean::parseBoolean(
                 properties->getProperty(core::ActiveMQConstants::toString(
                     core::ActiveMQConstants::CONNECTION_USEASYNCSEND), "false"));
-
             this->useCompression = Boolean::parseBoolean(
                 properties->getProperty(core::ActiveMQConstants::toString(
                     core::ActiveMQConstants::CONNECTION_USECOMPRESSION), "false"));
-
             this->compressionLevel = Integer::parseInt(
                 properties->getProperty("connection.compressionLevel", "-1"));
-
             this->messagePrioritySupported = Boolean::parseBoolean(
                 properties->getProperty("connection.messagePrioritySupported", "true"));
-
+            this->messagePrioritySupported = Boolean::parseBoolean(
+                properties->getProperty("connection.checkForDuplicates", "true"));
+            this->messagePrioritySupported = Integer::parseInt(
+                properties->getProperty("connection.auditDepth", "2048"));
+            this->messagePrioritySupported = Integer::parseInt(
+                properties->getProperty("connection.auditMaximumProducerNumber", "64"));
             this->dispatchAsync = Boolean::parseBoolean(
                 properties->getProperty(core::ActiveMQConstants::toString(
                     core::ActiveMQConstants::CONNECTION_DISPATCHASYNC), "true"));
-
             this->producerWindowSize = Integer::parseInt(
                 properties->getProperty(core::ActiveMQConstants::toString(
                     core::ActiveMQConstants::CONNECTION_PRODUCERWINDOWSIZE), "0"));
-
             this->sendTimeout = decaf::lang::Integer::parseInt(
                 properties->getProperty(core::ActiveMQConstants::toString(
                     core::ActiveMQConstants::CONNECTION_SENDTIMEOUT), "0"));
-
             this->closeTimeout = decaf::lang::Integer::parseInt(
                 properties->getProperty(core::ActiveMQConstants::toString(
                     core::ActiveMQConstants::CONNECTION_CLOSETIMEOUT), "15000"));
-
             this->clientId = properties->getProperty(
                 core::ActiveMQConstants::toString(core::ActiveMQConstants::PARAM_CLIENTID), clientId);
-
             this->username = properties->getProperty(
                 core::ActiveMQConstants::toString(core::ActiveMQConstants::PARAM_USERNAME), username);
-
             this->password = properties->getProperty(
                 core::ActiveMQConstants::toString(core::ActiveMQConstants::PARAM_PASSWORD), password);
+            this->optimizeAcknowledge = Boolean::parseBoolean(
+                properties->getProperty("connection.optimizeAcknowledge", "false"));
+            this->optimizeAcknowledge = Boolean::parseBoolean(
+                properties->getProperty("connection.exclusiveConsumer", "false"));
+            this->optimizeAcknowledge = Boolean::parseBoolean(
+                properties->getProperty("connection.transactedIndividualAck", "false"));
+            this->optimizeAcknowledge = Boolean::parseBoolean(
+                properties->getProperty("connection.useRetroactiveConsumer", "false"));
+            this->optimizeAcknowledge = Boolean::parseBoolean(
+                properties->getProperty("connection.sendAcksAsync", "true"));
+            this->messagePrioritySupported = Long::parseLong(
+                properties->getProperty("connection.optimizeAcknowledgeTimeOut", "300"));
+            this->messagePrioritySupported = Long::parseLong(
+                properties->getProperty("connection.optimizedAckScheduledAckInterval", "0"));
+            this->messagePrioritySupported = Long::parseLong(
+                properties->getProperty("connection.consumerFailoverRedeliveryWaitPeriod", "0"));
 
             this->defaultPrefetchPolicy->configure(*properties);
             this->defaultRedeliveryPolicy->configure(*properties);
@@ -361,6 +395,9 @@ void ActiveMQConnectionFactory::configur
     connection->setRedeliveryPolicy(this->settings->defaultRedeliveryPolicy->clone());
     connection->setMessagePrioritySupported(this->settings->messagePrioritySupported);
     connection->setWatchTopicAdvisories(this->settings->watchTopicAdvisories);
+    connection->setCheckForDuplicates(this->settings->checkForDuplicates);
+    connection->setAuditDepth(this->settings->auditDepth);
+    connection->setAuditMaximumProducerNumber(this->settings->auditMaximumProducerNumber);
 
     if (this->settings->defaultListener) {
         connection->setExceptionListener(this->settings->defaultListener);
@@ -489,6 +526,16 @@ void ActiveMQConnectionFactory::setUseAs
 }
 
 ////////////////////////////////////////////////////////////////////////////////
+bool ActiveMQConnectionFactory::isSendAcksAsync() const {
+    return this->settings->sendAcksAsync;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConnectionFactory::setSendAcksAsync(bool sendAcksAsync) {
+    this->settings->sendAcksAsync = sendAcksAsync;
+}
+
+////////////////////////////////////////////////////////////////////////////////
 bool ActiveMQConnectionFactory::isUseCompression() const {
     return this->settings->useCompression;
 }
@@ -562,3 +609,113 @@ bool ActiveMQConnectionFactory::isWatchT
 void ActiveMQConnectionFactory::setWatchTopicAdvisories(bool value) {
     this->settings->watchTopicAdvisories = value;
 }
+
+////////////////////////////////////////////////////////////////////////////////
+int ActiveMQConnectionFactory::getAuditDepth() const {
+    return this->settings->auditDepth;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConnectionFactory::setAuditDepth(int auditDepth) {
+    this->settings->auditDepth = auditDepth;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+int ActiveMQConnectionFactory::getAuditMaximumProducerNumber() const {
+    return this->settings->auditMaximumProducerNumber;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConnectionFactory::setAuditMaximumProducerNumber(int auditMaximumProducerNumber) {
+    this->settings->auditMaximumProducerNumber = auditMaximumProducerNumber;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool ActiveMQConnectionFactory::isCheckForDuplicates() const {
+    return this->settings->checkForDuplicates;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConnectionFactory::setCheckForDuplicates(bool checkForDuplicates) {
+    this->settings->checkForDuplicates = checkForDuplicates;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool ActiveMQConnectionFactory::isTransactedIndividualAck() const {
+    return this->settings->transactedIndividualAck;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConnectionFactory::setTransactedIndividualAck(bool transactedIndividualAck) {
+    this->settings->transactedIndividualAck = transactedIndividualAck;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool ActiveMQConnectionFactory::isNonBlockingRedelivery() const {
+    return this->settings->nonBlockingRedelivery;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConnectionFactory::setNonBlockingRedelivery(bool nonBlockingRedelivery) {
+    this->settings->nonBlockingRedelivery = nonBlockingRedelivery;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool ActiveMQConnectionFactory::isOptimizeAcknowledge() const {
+    return this->settings->optimizeAcknowledge;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConnectionFactory::setOptimizeAcknowledge(bool optimizeAcknowledge) {
+    this->settings->optimizeAcknowledge = optimizeAcknowledge;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+long long ActiveMQConnectionFactory::getOptimizeAcknowledgeTimeOut() const {
+    return this->settings->optimizeAcknowledgeTimeOut;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConnectionFactory::setOptimizeAcknowledgeTimeOut(long long optimizeAcknowledgeTimeOut) {
+    this->settings->optimizeAcknowledgeTimeOut = optimizeAcknowledgeTimeOut;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+long long ActiveMQConnectionFactory::getOptimizedAckScheduledAckInterval() const {
+    return this->settings->optimizedAckScheduledAckInterval;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConnectionFactory::setOptimizedAckScheduledAckInterval(long long optimizedAckScheduledAckInterval) {
+    this->settings->optimizedAckScheduledAckInterval = optimizedAckScheduledAckInterval;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+long long ActiveMQConnectionFactory::getConsumerFailoverRedeliveryWaitPeriod() const {
+    return this->settings->consumerFailoverRedeliveryWaitPeriod;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConnectionFactory::setConsumerFailoverRedeliveryWaitPeriod(long long value) {
+    this->settings->consumerFailoverRedeliveryWaitPeriod = value;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool ActiveMQConnectionFactory::isUseRetroactiveConsumer() const {
+    return this->settings->useRetroactiveConsumer;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConnectionFactory::setUseRetroactiveConsumer(bool useRetroactiveConsumer) {
+    this->settings->useRetroactiveConsumer = useRetroactiveConsumer;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool ActiveMQConnectionFactory::isExclusiveConsumer() const {
+    return this->settings->exclusiveConsumer;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConnectionFactory::setExclusiveConsumer(bool exclusiveConsumer) {
+    this->settings->exclusiveConsumer = exclusiveConsumer;
+}

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnectionFactory.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnectionFactory.h?rev=1463311&r1=1463310&r2=1463311&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnectionFactory.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnectionFactory.h Mon Apr  1 21:56:01 2013
@@ -40,12 +40,11 @@ namespace core{
     class AMQCPP_API ActiveMQConnectionFactory : public cms::ConnectionFactory {
     public:
 
-        // Default Broker URI if none specified
+        // Default Broker URI if none specified 'failover:tcp://localhost:61616'
         static const std::string DEFAULT_URI;
 
     private:
 
-        // d-Pointer holding pre-configured factory settings
         FactorySettings* settings;
 
     private:
@@ -59,9 +58,13 @@ namespace core{
 
         /**
          * Constructor
-         * @param url the URI of the Broker we are connecting to.
-         * @param username to authenticate with, defaults to ""
-         * @param password to authenticate with, defaults to ""
+         *
+         * @param uri
+         *      The URI of the Broker we are connecting to.
+         * @param username
+         *      The user name to authenticate with this connection.
+         * @param password
+         *      The password to authenticate with this connection.
          */
         ActiveMQConnectionFactory(const std::string& uri,
                                   const std::string& username = "",
@@ -69,9 +72,13 @@ namespace core{
 
         /**
          * Constructor
-         * @param uri the URI of the Broker we are connecting to.
-         * @param username to authenticate with, defaults to ""
-         * @param password to authenticate with, defaults to ""
+         *
+         * @param uri
+         *      The URI of the Broker we are connecting to.
+         * @param username
+         *      The user name to authenticate with this connection.
+         * @param password
+         *      The password to authenticate with this connection.
          */
         ActiveMQConnectionFactory(const decaf::net::URI& uri,
                                   const std::string& username = "",
@@ -84,8 +91,10 @@ namespace core{
          * connection is created in stopped mode. No messages will be
          * delivered until the Connection.start method is explicitly
          * called.
+         *
          * @returns a Connection Pointer
-         * @throws CMSException
+         *
+         * @throws CMSException if an error occurs.
          */
         virtual cms::Connection* createConnection();
 
@@ -93,14 +102,20 @@ namespace core{
          * Creates a connection with the specified user identity. The
          * connection is created in stopped mode. No messages will be
          * delivered until the Connection.start method is explicitly
-         * called.  The username and password values passed here do not
+         * called.  The user name and password values passed here do not
          * change the defaults, subsequent calls to the parameterless
          * createConnection will continue to use the default values that
          * were set in the Constructor.
-         * @param username to authenticate with
-         * @param password to authenticate with
+         *
+         * @param username
+         *      The user name to authenticate with this connection.
+         * @param password
+         *      The password to authenticate with this connection.
+         *
          * @returns a Connection Pointer
-         * @throws CMSException
+         *
+         * @throws CMSSecurityException if the user credentials are invalid.
+         * @throws CMSException if an error occurs.
          */
         virtual cms::Connection* createConnection(const std::string& username,
                                                   const std::string& password);
@@ -113,12 +128,19 @@ namespace core{
          * change the defaults, subsequent calls to the parameterless
          * createConnection will continue to use the default values that
          * were set in the Constructor.
-         * @param username to authenticate with
-         * @param password to authenticate with
-         * @param clientId to assign to connection if "" then a random cleint
-         *        Id is created for this connection.
+         *
+         * @param username
+         *      The user name to authenticate with this connection.
+         * @param password
+         *      The password to authenticate with this connection.
+         * @param clientId
+         *      The client Id to assign to connection if "" then a random client
+         *      Id is created for this connection.
+         *
          * @returns a Connection Pointer
-         * @throws CMSException
+         *
+         * @throws CMSSecurityException if the user credentials are invalid.
+         * @throws CMSException if an error occurs.
          */
         virtual cms::Connection* createConnection(const std::string& username,
                                                   const std::string& password,
@@ -297,6 +319,23 @@ namespace core{
         void setUseAsyncSend(bool value);
 
         /**
+         * Returns whether Message acknowledgments are sent asynchronously meaning no
+         * response is required from the broker before the ack completes.
+         *
+         * @return the sendAcksAsync configured value. (defaults to true)
+         */
+        bool isSendAcksAsync() const;
+
+        /**
+         * Sets whether Message acknowledgments are sent asynchronously meaning no
+         * response is required from the broker before the ack completes.
+         *
+         * @param sendAcksAsync
+         *      The sendAcksAsync configuration value to set.
+         */
+        void setSendAcksAsync(bool sendAcksAsync);
+
+        /**
          * Gets if the Connection is configured for Message body compression.
          * @returns if the Message body will be Compressed or not.
          */
@@ -386,6 +425,39 @@ namespace core{
         void setMessagePrioritySupported(bool value);
 
         /**
+         * Should all created consumers be retroactive.
+         *
+         * @returns true if consumer will be created with the retroactive flag set.
+         */
+        bool isUseRetroactiveConsumer() const;
+
+        /**
+         * Sets whether or not retroactive consumers are enabled. Retroactive
+         * consumers allow non-durable topic subscribers to receive old messages
+         * that were published before the non-durable subscriber started.
+         *
+         * @param useRetroactiveConsumer
+         *      The value of this configuration option.
+         */
+        void setUseRetroactiveConsumer(bool useRetroactiveConsumer);
+
+        /**
+         * Should all created consumers be exclusive.
+         *
+         * @returns true if consumer will be created with the exclusive flag set.
+         */
+        bool isExclusiveConsumer() const;
+
+        /**
+         * Enables or disables whether or not queue consumers should be exclusive or
+         * not for example to preserve ordering when not using Message Groups.
+         *
+         * @param exclusiveConsumer
+         *      The value of this configuration option.
+         */
+        void setExclusiveConsumer(bool exclusiveConsumer);
+
+        /**
          * Is the Connection created by this factory configured to watch for advisory messages
          * that inform the Connection about temporary destination create / destroy.
          *
@@ -402,6 +474,172 @@ namespace core{
          */
         void setWatchTopicAdvisories(bool value);
 
+        /**
+         * Get the audit depth for Messages for consumers when using a fault
+         * tolerant transport.  The higher the value the more messages are checked
+         * for duplication, and the larger the performance impact of duplicate
+         * detection will be.
+         *
+         * @returns the configured audit depth.
+         */
+        int getAuditDepth() const;
+
+        /**
+         * Set the audit depth for Messages for consumers when using a fault
+         * tolerant transport.  The higher the value the more messages are checked
+         * for duplication, and the larger the performance impact of duplicate
+         * detection will be.
+         *
+         * @param auditDepth
+         *      The configured audit depth.
+         */
+        void setAuditDepth(int auditDepth);
+
+        /**
+         * The number of Producers that will be audited.
+         *
+         * @returns the configured number of producers to include in the audit.
+         */
+        int getAuditMaximumProducerNumber() const;
+
+        /**
+         * The number of Producers that will be audited.
+         *
+         * @param auditMaximumProducerNumber
+         *      The configured number of producers to include in the audit.
+         */
+        void setAuditMaximumProducerNumber(int auditMaximumProducerNumber);
+
+        /**
+         * Gets the value of the configured Duplicate Message detection feature.
+         *
+         * When enabled and a fault tolerant transport is used (think failover) then
+         * this feature will help to detect and filter duplicate messages that might
+         * otherwise be delivered to a consumer after a connection failure.
+         *
+         * Disabling this can increase performance since no Message auditing will
+         * occur.
+         *
+         * @return the checkForDuplicates value currently set.
+         */
+        bool isCheckForDuplicates() const;
+
+        /**
+         * Gets the value of the configured Duplicate Message detection feature.
+         *
+         * When enabled and a fault tolerant transport is used (think failover) then
+         * this feature will help to detect and filter duplicate messages that might
+         * otherwise be delivered to a consumer after a connection failure.
+         *
+         * Disabling this can increase performance since no Message auditing will
+         * occur.
+         *
+         * @param checkForDuplicates
+         *      The checkForDuplicates value to be configured.
+         */
+        void setCheckForDuplicates(bool checkForDuplicates);
+
+        /**
+         * when true, submit individual transacted acks immediately rather than with transaction
+         * completion.  This allows the acks to represent delivery status which can be persisted on
+         * rollback Used in conjunction with KahaDB set to Rewrite On Redelivery.
+         *
+         * @returns true if this option is enabled.
+         */
+        bool isTransactedIndividualAck() const;
+
+        /**
+         * when true, submit individual transacted acks immediately rather than with transaction
+         * completion.  This allows the acks to represent delivery status which can be persisted on
+         * rollback Used in conjunction with KahaDB set to Rewrite On Redelivery.
+         *
+         * @param transactedIndividualAck
+         *      The value to set.
+         */
+        void setTransactedIndividualAck(bool transactedIndividualAck);
+
+        /**
+         * Returns true if non-blocking redelivery of Messages is configured for Consumers
+         * that are rolled back or recovered.
+         *
+         * @return true if non-blocking redelivery is enabled.
+         */
+        bool isNonBlockingRedelivery() const;
+
+        /**
+         * When true a MessageConsumer will not stop Message delivery before re-delivering Messages
+         * from a rolled back transaction.  This implies that message order will not be preserved and
+         * also will result in the TransactedIndividualAck option to be enabled.
+         *
+         * @param nonBlockingRedelivery
+         *      The value to configure for non-blocking redelivery.
+         */
+        void setNonBlockingRedelivery(bool nonBlockingRedelivery);
+
+        /**
+         * Gets the delay period for a consumer redelivery.
+         *
+         * @returns configured time delay in milliseconds.
+         */
+        long long getConsumerFailoverRedeliveryWaitPeriod() const;
+
+        /**
+         * Sets the delay period for a consumer redelivery.
+         *
+         * @param value
+         *      The configured time delay in milliseconds.
+         */
+        void setConsumerFailoverRedeliveryWaitPeriod(long long value);
+
+        /**
+         * @return true if optimizeAcknowledge is enabled.
+         */
+        bool isOptimizeAcknowledge() const;
+
+        /**
+         * Sets if Consumers are configured to use Optimized Acknowledge by default.
+         *
+         * @param optimizeAcknowledge
+         *      The optimizeAcknowledge mode to set.
+         */
+        void setOptimizeAcknowledge(bool optimizeAcknowledge);
+
+        /**
+         * Gets the time between optimized ack batches in milliseconds.
+         *
+         * @returns time between optimized ack batches in Milliseconds.
+         */
+        long long getOptimizeAcknowledgeTimeOut() const;
+
+        /**
+         * The max time in milliseconds between optimized ack batches.
+         *
+         * @param optimizeAcknowledgeTimeOut
+         *      The time in milliseconds for optimized ack batches.
+         */
+        void setOptimizeAcknowledgeTimeOut(long long optimizeAcknowledgeTimeOut);
+
+        /**
+         * Gets the configured time interval that is used to force all MessageConsumers that have
+         * optimizedAcknowledge enabled to send an ack for any outstanding Message Acks.  By default
+         * this value is set to zero meaning that the consumers will not do any background Message
+         * acknowledgment.
+         *
+         * @return the scheduledOptimizedAckInterval
+         */
+        long long getOptimizedAckScheduledAckInterval() const;
+
+        /**
+         * Sets the amount of time between scheduled sends of any outstanding Message Acks for
+         * consumers that have been configured with optimizeAcknowledge enabled.
+         *
+         * Time is given in Milliseconds.
+         *
+         * @param optimizedAckScheduledAckInterval
+         *      The scheduledOptimizedAckInterval to use for new Consumers.
+         */
+        void setOptimizedAckScheduledAckInterval(long long optimizedAckScheduledAckInterval);
+
     public:
 
         /**

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ConnectionAudit.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ConnectionAudit.cpp?rev=1463311&r1=1463310&r2=1463311&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ConnectionAudit.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ConnectionAudit.cpp Mon Apr  1 21:56:01 2013
@@ -19,6 +19,7 @@
 
 #include <decaf/util/LinkedHashMap.h>
 
+#include <activemq/core/Dispatcher.h>
 #include <activemq/core/ActiveMQMessageAudit.h>
 #include <activemq/commands/ActiveMQDestination.h>
 
@@ -48,7 +49,7 @@ namespace core {
 
         Mutex mutex;
         LinkedHashMap<Pointer<ActiveMQDestination>, Pointer<ActiveMQMessageAudit> > destinations;
-        LinkedHashMap<Pointer<Dispatcher>, Pointer<ActiveMQMessageAudit> > dispatchers;
+        LinkedHashMap<Dispatcher*, Pointer<ActiveMQMessageAudit> > dispatchers;
 
         ConnectionAuditImpl() : mutex(), destinations(1000), dispatchers(1000) {
         }
@@ -79,14 +80,14 @@ ConnectionAudit::~ConnectionAudit() {
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ConnectionAudit::removeDispatcher(Pointer<Dispatcher> dispatcher) {
+void ConnectionAudit::removeDispatcher(Dispatcher* dispatcher) {
     synchronized(&this->impl->mutex) {
         this->impl->dispatchers.remove(dispatcher);
     }
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-bool ConnectionAudit::isDuplicate(Pointer<Dispatcher> dispatcher, Pointer<commands::Message> message) {
+bool ConnectionAudit::isDuplicate(Dispatcher* dispatcher, Pointer<commands::Message> message) {
 
     if (checkForDuplicates && message != NULL) {
         Pointer<ActiveMQDestination> destination = message->getDestination();
@@ -117,7 +118,7 @@ bool ConnectionAudit::isDuplicate(Pointe
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ConnectionAudit::rollbackDuplicate(Pointer<Dispatcher> dispatcher, Pointer<commands::Message> message) {
+void ConnectionAudit::rollbackDuplicate(Dispatcher* dispatcher, Pointer<commands::Message> message) {
     if (checkForDuplicates && message != NULL) {
         Pointer<ActiveMQDestination> destination = message->getDestination();
         if (destination != NULL) {

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ConnectionAudit.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ConnectionAudit.h?rev=1463311&r1=1463310&r2=1463311&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ConnectionAudit.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ConnectionAudit.h Mon Apr  1 21:56:01 2013
@@ -21,12 +21,13 @@
 #include <activemq/util/Config.h>
 
 #include <activemq/commands/Message.h>
-#include <activemq/core/Dispatcher.h>
+#include <decaf/lang/Pointer.h>
 
 namespace activemq {
 namespace core {
 
     class ConnectionAuditImpl;
+    class Dispatcher;
 
     /**
      * Provides the Auditing functionality used by Connections to attempt to
@@ -58,11 +59,11 @@ namespace core {
 
     public:
 
-        void removeDispatcher(Pointer<Dispatcher> dispatcher);
+        void removeDispatcher(Dispatcher* dispatcher);
 
-        bool isDuplicate(Pointer<Dispatcher> dispatcher, Pointer<commands::Message> message);
+        bool isDuplicate(Dispatcher* dispatcher, decaf::lang::Pointer<commands::Message> message);
 
-        void rollbackDuplicate(Pointer<Dispatcher> dispatcher, Pointer<commands::Message> message);
+        void rollbackDuplicate(Dispatcher* dispatcher, decaf::lang::Pointer<commands::Message> message);
 
     public:
 

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/Dispatcher.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/Dispatcher.h?rev=1463311&r1=1463310&r2=1463311&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/Dispatcher.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/Dispatcher.h Mon Apr  1 21:56:01 2013
@@ -25,8 +25,6 @@
 namespace activemq {
 namespace core {
 
-    using decaf::lang::Pointer;
-
     /**
      * Interface for an object responsible for dispatching messages to
      * consumers.
@@ -42,7 +40,7 @@ namespace core {
          * @param message
          *      The message to be dispatched to a waiting consumer.
          */
-        virtual void dispatch(const Pointer<commands::MessageDispatch>& message) = 0;
+        virtual void dispatch(const decaf::lang::Pointer<commands::MessageDispatch>& message) = 0;
 
         /**
          * HashCode method allowing Dispatcher instances to be used in HashMap etc.



Mime
View raw message