activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r1338190 - in /activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels: ActiveMQConsumerKernel.cpp ActiveMQSessionKernel.cpp ActiveMQSessionKernel.h
Date Mon, 14 May 2012 13:21:53 GMT
Author: tabish
Date: Mon May 14 13:21:53 2012
New Revision: 1338190

URL: http://svn.apache.org/viewvc?rev=1338190&view=rev
Log:
Ensure that a scheduled task for a MessageConsumer keeps the consumer alive so that it does
not segfault if the task executes after the consumer was closed.

Modified:
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.h

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp?rev=1338190&r1=1338189&r2=1338190&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp
(original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp
Mon May 14 13:21:53 2012
@@ -267,7 +267,7 @@ namespace kernels {
     class StartConsumerTask : public Runnable {
     private:
 
-        ActiveMQConsumerKernel* consumer;
+        Pointer<ActiveMQConsumerKernel> consumer;
 
     private:
 
@@ -276,14 +276,11 @@ namespace kernels {
 
     public:
 
-        StartConsumerTask(ActiveMQConsumerKernel* consumer) : Runnable(), consumer(NULL)
{
-
+        StartConsumerTask(Pointer<ActiveMQConsumerKernel> consumer) : Runnable(), consumer(consumer)
{
             if (consumer == NULL) {
                 throw NullPointerException(
                     __FILE__, __LINE__, "Synchronization Created with NULL Consumer.");
             }
-
-            this->consumer = consumer;
         }
 
         virtual ~StartConsumerTask() {}
@@ -612,7 +609,7 @@ cms::Message* ActiveMQConsumerKernel::re
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-cms::Message* ActiveMQConsumerKernel::receive( int millisecs ) {
+cms::Message* ActiveMQConsumerKernel::receive(int millisecs) {
 
     try {
 
@@ -1049,8 +1046,10 @@ void ActiveMQConsumerKernel::rollback() 
                 }
 
                 if (internal->redeliveryDelay > 0 && !this->internal->unconsumedMessages->isClosed())
{
+                    Pointer<ActiveMQConsumerKernel> self =
+                        this->session->lookupConsumerKernel(this->consumerInfo->getConsumerId());
                     this->internal->scheduler->executeAfterDelay(
-                        new StartConsumerTask(this), internal->redeliveryDelay);
+                        new StartConsumerTask(self), internal->redeliveryDelay);
                 } else {
                     start();
                 }

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.cpp?rev=1338190&r1=1338189&r2=1338190&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.cpp
(original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.cpp
Mon May 14 13:21:53 2012
@@ -623,8 +623,8 @@ cms::MessageProducer* ActiveMQSessionKer
         }
 
         // Create the producer instance.
-        Pointer<ActiveMQProducerKernel> producer( new ActiveMQProducerKernel(
-            this, this->getNextProducerId(), dest, this->connection->getSendTimeout()
) );
+        Pointer<ActiveMQProducerKernel> producer(new ActiveMQProducerKernel(
+            this, this->getNextProducerId(), dest, this->connection->getSendTimeout()));
 
         try {
             this->addProducer(producer);
@@ -1160,7 +1160,7 @@ void ActiveMQSessionKernel::addConsumer(
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ActiveMQSessionKernel::removeConsumer(const Pointer<ConsumerId>& consumerId)
{
+void ActiveMQSessionKernel::removeConsumer(Pointer<ConsumerId> consumerId) {
 
     try {
 
@@ -1204,6 +1204,33 @@ void ActiveMQSessionKernel::removeProduc
 }
 
 ////////////////////////////////////////////////////////////////////////////////
+Pointer<ActiveMQProducerKernel> ActiveMQSessionKernel::lookupProducerKernel(Pointer<ProducerId>
id) {
+
+    std::auto_ptr<Iterator<Pointer<ActiveMQProducerKernel> > > producerIter(this->config->producers.iterator());
+
+    while (producerIter->hasNext()) {
+        Pointer<ActiveMQProducerKernel> producer = producerIter->next();
+        if (producer->getProducerId()->equals(*id)) {
+            return producer;
+        }
+    }
+
+    return Pointer<ActiveMQProducerKernel>();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+Pointer<ActiveMQConsumerKernel> ActiveMQSessionKernel::lookupConsumerKernel(Pointer<ConsumerId>
id) {
+
+    synchronized(&this->consumers) {
+        if (this->consumers.containsKey(id)) {
+            return this->consumers.get(id);
+        }
+    }
+
+    return Pointer<ActiveMQConsumerKernel>();
+}
+
+////////////////////////////////////////////////////////////////////////////////
 void ActiveMQSessionKernel::setPrefetchSize(Pointer<ConsumerId> id, int prefetch) {
 
     synchronized(&this->consumers) {

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.h?rev=1338190&r1=1338189&r2=1338190&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.h
(original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.h
Mon May 14 13:21:53 2012
@@ -402,7 +402,7 @@ namespace kernels {
          *
          * @throw ActiveMQException if an internal error occurs.
          */
-        void addConsumer(Pointer<activemq::core::kernels::ActiveMQConsumerKernel> consumer);
+        void addConsumer(Pointer<ActiveMQConsumerKernel> consumer);
 
         /**
          * Dispose of a MessageConsumer from this session.  Removes it from the Connection
@@ -413,7 +413,7 @@ namespace kernels {
          *
          * @throw ActiveMQException if an internal error occurs.
          */
-        void removeConsumer(const Pointer<commands::ConsumerId>& consumerId);
+        void removeConsumer(Pointer<commands::ConsumerId> consumerId);
 
         /**
          * Adds a MessageProducer to this session registering it with the Connection and
store
@@ -425,7 +425,7 @@ namespace kernels {
          *
          * @throw ActiveMQException if an internal error occurs.
          */
-        void addProducer(Pointer<activemq::core::kernels::ActiveMQProducerKernel> producer);
+        void addProducer(Pointer<ActiveMQProducerKernel> producer);
 
         /**
          * Dispose of a MessageProducer from this session.  Removes it from the Connection
@@ -436,7 +436,7 @@ namespace kernels {
          *
          * @throw ActiveMQException if an internal error occurs.
          */
-        void removeProducer(Pointer<activemq::core::kernels::ActiveMQProducerKernel>
producer);
+        void removeProducer(Pointer<ActiveMQProducerKernel> producer);
 
         /**
          * Starts if not already start a Transaction for this Session.  If the session
@@ -533,6 +533,16 @@ namespace kernels {
          */
         bool isInUse(Pointer<commands::ActiveMQDestination> destination);
 
+        /**
+         * @returns a Pointer to an ActiveMQProducerKernel using its ProducerId, or NULL.
+         */
+        Pointer<ActiveMQProducerKernel> lookupProducerKernel(Pointer<commands::ProducerId>
id);
+
+        /**
+         * @returns a Pointer to an ActiveMQProducerKernel using its ProducerId, or NULL.
+         */
+        Pointer<ActiveMQConsumerKernel> lookupConsumerKernel(Pointer<commands::ConsumerId>
id);
+
    private:
 
        /**



Mime
View raw message