activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r1082497 - in /activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core: ActiveMQConnection.cpp ActiveMQConnection.h ActiveMQConsumer.cpp ActiveMQSession.cpp ActiveMQSession.h
Date Thu, 17 Mar 2011 14:20:46 GMT
Author: tabish
Date: Thu Mar 17 14:20:45 2011
New Revision: 1082497

URL: http://svn.apache.org/viewvc?rev=1082497&view=rev
Log:
Attempting to fix: https://issues.apache.org/jira/browse/AMQCPP-355

Modified:
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.h

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp?rev=1082497&r1=1082496&r2=1082497&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp
(original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp
Thu Mar 17 14:20:45 2011
@@ -1193,8 +1193,8 @@ transport::Transport& ActiveMQConnection
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-threads::Scheduler& ActiveMQConnection::getScheduler() const {
-    return *( this->config->scheduler );
+Pointer<Scheduler> ActiveMQConnection::getScheduler() const {
+    return this->config->scheduler;
 }
 
 ////////////////////////////////////////////////////////////////////////////////

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h?rev=1082497&r1=1082496&r2=1082497&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h Thu
Mar 17 14:20:45 2011
@@ -595,7 +595,7 @@ namespace core{
          *
          * @return a reference to a Scheduler instance owned by this Connection.
          */
-        threads::Scheduler& getScheduler() const;
+        Pointer<threads::Scheduler> getScheduler() const;
 
         /**
          * Returns the Id of the Resource Manager that this client will use should

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp?rev=1082497&r1=1082496&r2=1082497&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp Thu
Mar 17 14:20:45 2011
@@ -42,6 +42,7 @@
 #include <activemq/core/FifoMessageDispatchChannel.h>
 #include <activemq/core/SimplePriorityMessageDispatchChannel.h>
 #include <activemq/core/RedeliveryPolicy.h>
+#include <activemq/threads/Scheduler.h>
 #include <cms/ExceptionListener.h>
 #include <memory>
 
@@ -51,6 +52,7 @@ using namespace activemq::util;
 using namespace activemq::core;
 using namespace activemq::commands;
 using namespace activemq::exceptions;
+using namespace activemq::threads;
 using namespace decaf::lang;
 using namespace decaf::lang::exceptions;
 using namespace decaf::util;
@@ -84,6 +86,7 @@ namespace core {
         long long redeliveryDelay;
         Pointer<RedeliveryPolicy> redeliveryPolicy;
         Pointer<Exception> failureError;
+        Pointer<Scheduler> scheduler;
 
         ActiveMQConsumerMembers() : listener(NULL),
                                     listenerMutex(),
@@ -100,7 +103,8 @@ namespace core {
                                     inProgressClearRequiredFlag(false),
                                     redeliveryDelay(0),
                                     redeliveryPolicy(),
-                                    failureError() {
+                                    failureError(),
+                                    scheduler() {
         }
 
     };
@@ -258,6 +262,45 @@ namespace core {
         }
     };
 
+    /**
+     * Class used to Start a Consumer's dispatch queue asynchronously from the
+     * configured Scheduler.
+     */
+    class StartConsumerTask : public Runnable {
+    private:
+
+        ActiveMQConsumer* consumer;
+
+    private:
+
+        StartConsumerTask( const StartConsumerTask& );
+        StartConsumerTask& operator= ( const StartConsumerTask& );
+
+    public:
+
+        StartConsumerTask( ActiveMQConsumer* consumer ) : Runnable(), consumer(NULL) {
+
+            if( consumer == NULL ) {
+                throw NullPointerException(
+                    __FILE__, __LINE__, "Synchronization Created with NULL Consumer.");
+            }
+
+            this->consumer = consumer;
+        }
+
+        virtual ~StartConsumerTask() {}
+
+        virtual void run() {
+            try{
+                if(!this->consumer->isClosed()) {
+                    this->consumer->start();
+                }
+            } catch(cms::CMSException& ex) {
+                // TODO - Need Connection onAsyncException method.
+            }
+        }
+    };
+
 }}
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -317,6 +360,7 @@ ActiveMQConsumer::ActiveMQConsumer( Acti
     this->internal->listener = NULL;
     this->internal->redeliveryDelay = 0;
     this->internal->redeliveryPolicy.reset( this->session->getConnection()->getRedeliveryPolicy()->clone()
);
+    this->internal->scheduler = this->session->getScheduler();
 
     if( this->session->getConnection()->isMessagePrioritySupported() ) {
         this->internal->unconsumedMessages.reset( new SimplePriorityMessageDispatchChannel()
);
@@ -1049,19 +1093,10 @@ void ActiveMQConsumer::rollback() {
                 }
 
                 if( internal->redeliveryDelay > 0 && !this->internal->unconsumedMessages->isClosed()
) {
-                    // TODO
+                    // TODO - Can't do this until we can control object lifetime.
                     // Start up the delivery again a little later.
-                    //scheduler.executeAfterDelay(new Runnable() {
-                    //    public void run() {
-                    //        try {
-                    //            if( !started.get() ) {
-                    //                start();
-                    //            }
-                    //        } catch( CMSException& e ) {
-                    //            session.connection.onAsyncException(e);
-                    //        }
-                    //    }
-                    //}, redeliveryDelay);
+                    // this->internal->scheduler->executeAfterDelay(
+                    //    new StartConsumerTask(this), internal->redeliveryDelay);
                     start();
                 } else {
                     start();

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp?rev=1082497&r1=1082496&r2=1082497&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp Thu
Mar 17 14:20:45 2011
@@ -63,6 +63,7 @@ using namespace activemq::util;
 using namespace activemq::core;
 using namespace activemq::commands;
 using namespace activemq::exceptions;
+using namespace activemq::threads;
 using namespace decaf::util;
 using namespace decaf::util::concurrent;
 using namespace decaf::lang;
@@ -165,19 +166,16 @@ namespace core{
 
     public:
 
-        /**
-         * Bool to indicate if the Session has added a Syncronization to a TransactionContext.
-         */
         bool synchronizationRegistered;
-
-        /**
-         * Map of producers.
-         */
         decaf::util::concurrent::CopyOnWriteArrayList<ActiveMQProducer*> producers;
+        Pointer<Scheduler> scheduler;
 
     public:
 
-        SessionConfig() : synchronizationRegistered( false ), producers() {}
+        SessionConfig() : synchronizationRegistered( false ),
+                          producers(),
+                          scheduler()
+        {}
 
         ~SessionConfig() {}
     };
@@ -226,6 +224,9 @@ ActiveMQSession::ActiveMQSession( Active
 
     this->connection->addSession( this );
 
+    // Use the Connection's Scheduler.
+    this->config->scheduler = this->connection->getScheduler();
+
     // If the connection is already started, start the session.
     if( this->connection->isStarted() ) {
         this->start();
@@ -432,7 +433,7 @@ void ActiveMQSession::clearMessagesInPro
         for( ; iter != consumers.end(); ++iter ) {
             (*iter)->inProgressClearRequired();
 
-            this->connection->getScheduler().executeAfterDelay(
+            this->connection->getScheduler()->executeAfterDelay(
                 new ClearConsumerTask(*iter), 0LL);
         }
     }
@@ -923,6 +924,11 @@ cms::ExceptionListener* ActiveMQSession:
 }
 
 ////////////////////////////////////////////////////////////////////////////////
+Pointer<Scheduler> ActiveMQSession::getScheduler() const {
+    return this->config->scheduler;
+}
+
+////////////////////////////////////////////////////////////////////////////////
 void ActiveMQSession::unsubscribe( const std::string& name ) {
 
     try{

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.h?rev=1082497&r1=1082496&r2=1082497&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.h Thu
Mar 17 14:20:45 2011
@@ -34,6 +34,7 @@
 #include <activemq/core/Dispatcher.h>
 #include <activemq/core/MessageDispatchChannel.h>
 #include <activemq/util/LongSequenceGenerator.h>
+#include <activemq/threads/Scheduler.h>
 
 #include <decaf/lang/Pointer.h>
 #include <decaf/util/StlMap.h>
@@ -308,6 +309,11 @@ namespace core{
         }
 
         /**
+         * Gets a Pointer to this Session's Scheduler instance
+         */
+        Pointer<threads::Scheduler> getScheduler() const;
+
+        /**
          * Gets the currently set Last Delivered Sequence Id
          *
          * @returns long long containing the sequence id of the last delivered Message.



Mime
View raw message