activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject git commit: fix https://issues.apache.org/jira/browse/AMQCPP-509
Date Fri, 06 Sep 2013 21:54:32 GMT
Updated Branches:
  refs/heads/trunk 22b2b7570 -> 66103dd16


fix https://issues.apache.org/jira/browse/AMQCPP-509

Adds support for alwaysSessionAsync option so that threads in Session
can be bypassed if needed. 

Project: http://git-wip-us.apache.org/repos/asf/activemq-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-cpp/commit/66103dd1
Tree: http://git-wip-us.apache.org/repos/asf/activemq-cpp/tree/66103dd1
Diff: http://git-wip-us.apache.org/repos/asf/activemq-cpp/diff/66103dd1

Branch: refs/heads/trunk
Commit: 66103dd16416e162a329512034fc46f4fcfd271b
Parents: 22b2b75
Author: Timothy Bish <tabish121@gmai.com>
Authored: Fri Sep 6 17:39:02 2013 -0400
Committer: Timothy Bish <tabish121@gmai.com>
Committed: Fri Sep 6 17:54:23 2013 -0400

----------------------------------------------------------------------
 .../main/activemq/core/ActiveMQConnection.cpp   | 19 +++++++++++++---
 .../src/main/activemq/core/ActiveMQConnection.h | 17 +++++++++++++--
 .../activemq/core/ActiveMQConnectionFactory.cpp | 15 +++++++++++++
 .../activemq/core/ActiveMQConnectionFactory.h   | 22 +++++++++++++++++--
 .../activemq/core/ActiveMQSessionExecutor.cpp   | 13 ++++++++---
 .../core/kernels/ActiveMQSessionKernel.cpp      | 16 ++++++++++++--
 .../core/kernels/ActiveMQSessionKernel.h        | 14 ++++++++++++
 .../test/openwire/OpenwireSimpleTest.cpp        | 23 ++++++++++++++++++++
 .../activemq/test/openwire/OpenwireSimpleTest.h |  2 ++
 9 files changed, 129 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-cpp/blob/66103dd1/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp
----------------------------------------------------------------------
diff --git a/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp b/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp
index 0569f97..a8e518d 100644
--- a/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp
+++ b/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp
@@ -90,8 +90,8 @@ using namespace decaf::lang;
 using namespace decaf::lang::exceptions;
 
 ////////////////////////////////////////////////////////////////////////////////
-namespace activemq{
-namespace core{
+namespace activemq {
+namespace core {
 
     class ConnectionThreadFactory : public ThreadFactory {
     private:
@@ -176,6 +176,7 @@ namespace core{
         bool exclusiveConsumer;
         bool transactedIndividualAck;
         bool nonBlockingRedelivery;
+        bool alwaysSessionAsync;
         int compressionLevel;
         unsigned int sendTimeout;
         unsigned int closeTimeout;
@@ -243,6 +244,7 @@ namespace core{
                              exclusiveConsumer(false),
                              transactedIndividualAck(false),
                              nonBlockingRedelivery(false),
+                             alwaysSessionAsync(true),
                              compressionLevel(-1),
                              sendTimeout(0),
                              closeTimeout(15000),
@@ -544,7 +546,8 @@ cms::Session* ActiveMQConnection::createSession(cms::Session::AcknowledgeMode
ac
         // Create the session instance as a Session Kernel we then create and return a
         // ActiveMQSession instance that acts as a proxy to the kernel caller can delete
         // that at any time since we only refer to the Pointer to the session kernel.
-        Pointer<ActiveMQSessionKernel> session(new ActiveMQSessionKernel(this, getNextSessionId(),
ackMode, *this->config->properties));
+        Pointer<ActiveMQSessionKernel> session(
+            new ActiveMQSessionKernel(this, getNextSessionId(), ackMode, *this->config->properties));
 
         session->setMessageTransformer(this->config->transformer);
 
@@ -1924,3 +1927,13 @@ bool ActiveMQConnection::isDuplicate(Dispatcher* dispatcher, Pointer<commands::M
 void ActiveMQConnection::rollbackDuplicate(Dispatcher* dispatcher, Pointer<commands::Message>
message) {
     this->config->connectionAudit.rollbackDuplicate(dispatcher, message);
 }
+
+////////////////////////////////////////////////////////////////////////////////
+bool ActiveMQConnection::isAlwaysSessionAsync() const {
+    return this->config->alwaysSessionAsync;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConnection::setAlwaysSessionAsync(bool alwaysSessionAsync) {
+    this->config->alwaysSessionAsync = alwaysSessionAsync;
+}

http://git-wip-us.apache.org/repos/asf/activemq-cpp/blob/66103dd1/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h
----------------------------------------------------------------------
diff --git a/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h b/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h
index 4b5a10c..08e5d24 100644
--- a/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h
+++ b/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h
@@ -41,8 +41,8 @@
 #include <string>
 #include <memory>
 
-namespace activemq{
-namespace core{
+namespace activemq {
+namespace core {
 
     using decaf::lang::Pointer;
 
@@ -786,6 +786,19 @@ namespace core{
          */
         void setSendAcksAsync(bool sendAcksAsync);
 
+        /**
+         * @return Returns the alwaysSessionAsync configuration setting.
+         */
+        bool isAlwaysSessionAsync() const;
+
+        /**
+         * If this flag is not set then a separate thread is not used for dispatching messages
+         * for each Session in the Connection. However, a separate thread is always used
if there
+         * is more than one session, or the session isn't in auto acknowledge or duplicates
ok mode.
+         * By default this value is set to true and session dispatch happens asynchronously.
+         */
+        void setAlwaysSessionAsync(bool alwaysSessionAsync);
+
     public: // TransportListener
 
         /**

http://git-wip-us.apache.org/repos/asf/activemq-cpp/blob/66103dd1/activemq-cpp/src/main/activemq/core/ActiveMQConnectionFactory.cpp
----------------------------------------------------------------------
diff --git a/activemq-cpp/src/main/activemq/core/ActiveMQConnectionFactory.cpp b/activemq-cpp/src/main/activemq/core/ActiveMQConnectionFactory.cpp
index 13d680f..50245be 100644
--- a/activemq-cpp/src/main/activemq/core/ActiveMQConnectionFactory.cpp
+++ b/activemq-cpp/src/main/activemq/core/ActiveMQConnectionFactory.cpp
@@ -89,6 +89,7 @@ namespace core{
         bool exclusiveConsumer;
         bool transactedIndividualAck;
         bool nonBlockingRedelivery;
+        bool alwaysSessionAsync;
         int compressionLevel;
         unsigned int sendTimeout;
         unsigned int closeTimeout;
@@ -123,6 +124,7 @@ namespace core{
                             exclusiveConsumer(false),
                             transactedIndividualAck(false),
                             nonBlockingRedelivery(false),
+                            alwaysSessionAsync(true),
                             compressionLevel(-1),
                             sendTimeout(0),
                             closeTimeout(15000),
@@ -216,6 +218,8 @@ namespace core{
                 properties->getProperty("connection.nonBlockingRedelivery", Boolean::toString(nonBlockingRedelivery)));
             this->watchTopicAdvisories = Boolean::parseBoolean(
                 properties->getProperty("connection.watchTopicAdvisories", Boolean::toString(watchTopicAdvisories)));
+            this->alwaysSessionAsync = Boolean::parseBoolean(
+                properties->getProperty("connection.alwaysSessionAsync", Boolean::toString(alwaysSessionAsync)));
 
             this->defaultPrefetchPolicy->configure(*properties);
             this->defaultRedeliveryPolicy->configure(*properties);
@@ -411,6 +415,7 @@ void ActiveMQConnectionFactory::configureConnection(ActiveMQConnection*
connecti
     connection->setUseRetroactiveConsumer(this->settings->useRetroactiveConsumer);
     connection->setNonBlockingRedelivery(this->settings->nonBlockingRedelivery);
     connection->setConsumerFailoverRedeliveryWaitPeriod(this->settings->consumerFailoverRedeliveryWaitPeriod);
+    connection->setAlwaysSessionAsync(this->settings->alwaysSessionAsync);
 
     if (this->settings->defaultListener) {
         connection->setExceptionListener(this->settings->defaultListener);
@@ -732,3 +737,13 @@ bool ActiveMQConnectionFactory::isExclusiveConsumer() const {
 void ActiveMQConnectionFactory::setExclusiveConsumer(bool exclusiveConsumer) {
     this->settings->exclusiveConsumer = exclusiveConsumer;
 }
+
+////////////////////////////////////////////////////////////////////////////////
+bool ActiveMQConnectionFactory::isAlwaysSessionAsync() const {
+    return this->settings->alwaysSessionAsync;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConnectionFactory::setAlwaysSessionAsync(bool alwaysSessionAsync) {
+    this->settings->alwaysSessionAsync = alwaysSessionAsync;
+}

http://git-wip-us.apache.org/repos/asf/activemq-cpp/blob/66103dd1/activemq-cpp/src/main/activemq/core/ActiveMQConnectionFactory.h
----------------------------------------------------------------------
diff --git a/activemq-cpp/src/main/activemq/core/ActiveMQConnectionFactory.h b/activemq-cpp/src/main/activemq/core/ActiveMQConnectionFactory.h
index e2be727..3828f70 100644
--- a/activemq-cpp/src/main/activemq/core/ActiveMQConnectionFactory.h
+++ b/activemq-cpp/src/main/activemq/core/ActiveMQConnectionFactory.h
@@ -27,8 +27,8 @@
 #include <decaf/net/URI.h>
 #include <decaf/util/Properties.h>
 
-namespace activemq{
-namespace core{
+namespace activemq {
+namespace core {
 
     using decaf::lang::Pointer;
 
@@ -640,6 +640,24 @@ namespace core{
          */
         void setOptimizedAckScheduledAckInterval(long long optimizedAckScheduledAckInterval);
 
+        /**
+         * Returns the current value of the always session async option.
+         *
+         * @return Returns the alwaysSessionAsync configuration setting.
+         */
+        bool isAlwaysSessionAsync() const;
+
+        /**
+         * If this flag is not set 'true' then a separate thread is not used for dispatching
messages
+         * for each Session in the Connection. However, a separate thread is always used
if there
+         * is more than one session, or the session isn't in auto acknowledge or duplicates
ok mode.
+         * By default this value is set to true and session dispatch happens asynchronously.
+         *
+         * @param alwaysSessionAsync
+         * 		The alwaysSessionAsync value to use when creating new sessions.
+         */
+        void setAlwaysSessionAsync(bool alwaysSessionAsync);
+
     public:
 
         /**

http://git-wip-us.apache.org/repos/asf/activemq-cpp/blob/66103dd1/activemq-cpp/src/main/activemq/core/ActiveMQSessionExecutor.cpp
----------------------------------------------------------------------
diff --git a/activemq-cpp/src/main/activemq/core/ActiveMQSessionExecutor.cpp b/activemq-cpp/src/main/activemq/core/ActiveMQSessionExecutor.cpp
index 48a919c..5392950 100644
--- a/activemq-cpp/src/main/activemq/core/ActiveMQSessionExecutor.cpp
+++ b/activemq-cpp/src/main/activemq/core/ActiveMQSessionExecutor.cpp
@@ -81,9 +81,12 @@ ActiveMQSessionExecutor::~ActiveMQSessionExecutor() {
 ////////////////////////////////////////////////////////////////////////////////
 void ActiveMQSessionExecutor::execute(const Pointer<MessageDispatch>& dispatch)
{
 
-    // Add the data to the queue.
-    this->messageQueue->enqueue(dispatch);
-    this->wakeup();
+    if (this->session->isSessionAsyncDispatch()) {
+        this->messageQueue->enqueue(dispatch);
+        this->wakeup();
+    } else {
+        this->dispatch(dispatch);
+    }
 }
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -97,6 +100,10 @@ void ActiveMQSessionExecutor::executeFirst(const Pointer<MessageDispatch>&
dispa
 ////////////////////////////////////////////////////////////////////////////////
 void ActiveMQSessionExecutor::wakeup() {
 
+    if (!this->session->isSessionAsyncDispatch()) {
+        return;
+    }
+
     Pointer<TaskRunner> taskRunner;
     synchronized(messageQueue.get()) {
         if (this->taskRunner == NULL) {

http://git-wip-us.apache.org/repos/asf/activemq-cpp/blob/66103dd1/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.cpp
----------------------------------------------------------------------
diff --git a/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.cpp b/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.cpp
index 0f320e8..776487f 100644
--- a/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.cpp
+++ b/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.cpp
@@ -100,13 +100,14 @@ namespace kernels{
         Mutex sendMutex;
         cms::MessageTransformer* transformer;
         int hashCode;
+        bool sessionAsyncDispatch;
 
     public:
 
         SessionConfig() : synchronizationRegistered(false),
                           producerLock(), producers(), consumerLock(), consumers(),
                           scheduler(), closeSync(), sendMutex(), transformer(NULL),
-                          hashCode() {}
+                          hashCode(), sessionAsyncDispatch(true) {}
         ~SessionConfig() {}
     };
 
@@ -226,8 +227,9 @@ ActiveMQSessionKernel::ActiveMQSessionKernel(ActiveMQConnection* connection,
 
     this->closed.set(false);
     this->lastDeliveredSequenceId = -1;
+    this->config->sessionAsyncDispatch = connection->isAlwaysSessionAsync();
 
-    // Create a Transaction objet
+    // Create a Transaction object
     this->transaction.reset(new ActiveMQTransactionContext(this, properties));
 
     // Create the session executor object.
@@ -1495,3 +1497,13 @@ void ActiveMQSessionKernel::sendAck(Pointer<MessageAck> ack,
bool async) {
         this->connection->syncRequest(ack);
     }
 }
+
+////////////////////////////////////////////////////////////////////////////////
+bool ActiveMQSessionKernel::isSessionAsyncDispatch() const {
+    return this->config->sessionAsyncDispatch;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQSessionKernel::setSessionAsyncDispatch(bool sessionAsyncDispatch) {
+    this->config->sessionAsyncDispatch = sessionAsyncDispatch;
+}

http://git-wip-us.apache.org/repos/asf/activemq-cpp/blob/66103dd1/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.h
----------------------------------------------------------------------
diff --git a/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.h b/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.h
index f4af526..f05c56a 100644
--- a/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.h
+++ b/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.h
@@ -568,6 +568,20 @@ namespace kernels {
          */
         void sendAck(decaf::lang::Pointer<commands::MessageAck> ack, bool async = false);
 
+        /**
+         * Returns true if this session is dispatching messages to its consumers asynchronously.
+         *
+         * @return Returns the sessionAsyncDispatch.
+         */
+        bool isSessionAsyncDispatch() const;
+
+        /**
+         * Configures asynchronous message dispatch to this session's consumers.
+         *
+         * @param sessionAsyncDispatch The sessionAsyncDispatch to set.
+         */
+        void setSessionAsyncDispatch(bool sessionAsyncDispatch);
+
    private:
 
        /**

http://git-wip-us.apache.org/repos/asf/activemq-cpp/blob/66103dd1/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireSimpleTest.cpp
----------------------------------------------------------------------
diff --git a/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireSimpleTest.cpp
b/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireSimpleTest.cpp
index 7d3d782..5ce97d4 100644
--- a/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireSimpleTest.cpp
+++ b/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireSimpleTest.cpp
@@ -383,3 +383,26 @@ void OpenwireSimpleTest::testMessageIdSetOnSend() {
     CPPUNIT_ASSERT(message->getCMSMessageID() != "");
     CPPUNIT_ASSERT(message->getCMSDestination() != NULL);
 }
+
+////////////////////////////////////////////////////////////////////////////////
+void OpenwireSimpleTest::testReceiveWithSessionSyncDispatch() {
+
+    ActiveMQConnection* amqConnection = dynamic_cast<ActiveMQConnection*>(cmsProvider->getConnection());
+    amqConnection->setAlwaysSessionAsync(false);
+
+    cmsProvider->reconnectSession();
+
+    // Create CMS Object for Comms
+    cms::Session* session( cmsProvider->getSession() );
+    cms::MessageConsumer* consumer = cmsProvider->getConsumer();
+    cms::MessageProducer* producer = cmsProvider->getProducer();
+    producer->setDeliveryMode( DeliveryMode::NON_PERSISTENT );
+
+    auto_ptr<cms::TextMessage> txtMessage( session->createTextMessage( "TEST MESSAGE"
) );
+
+    // Send some text messages
+    producer->send( txtMessage.get() );
+
+    auto_ptr<cms::Message> message( consumer->receive( 1000 ) );
+    CPPUNIT_ASSERT( message.get() != NULL );
+}

http://git-wip-us.apache.org/repos/asf/activemq-cpp/blob/66103dd1/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireSimpleTest.h
----------------------------------------------------------------------
diff --git a/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireSimpleTest.h
b/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireSimpleTest.h
index 3be5f00..1d399a1 100644
--- a/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireSimpleTest.h
+++ b/activemq-cpp/src/test-integration/activemq/test/openwire/OpenwireSimpleTest.h
@@ -50,6 +50,7 @@ namespace openwire{
         CPPUNIT_TEST( testLibraryInitShutdownInit );
         CPPUNIT_TEST( testBytesMessageSendRecvAsync );
         CPPUNIT_TEST( testMessageIdSetOnSend );
+        CPPUNIT_TEST( testReceiveWithSessionSyncDispatch );
         CPPUNIT_TEST_SUITE_END();
 
     public:
@@ -70,6 +71,7 @@ namespace openwire{
         void tesstStreamMessage();
         void testDestroyDestination();
         void testMessageIdSetOnSend();
+        void testReceiveWithSessionSyncDispatch();
 
     };
 


Mime
View raw message