Author: tabish Date: Mon May 14 22:06:45 2012 New Revision: 1338453 URL: http://svn.apache.org/viewvc?rev=1338453&view=rev Log: Remove use of the CopyOnWriteArrayList as it currently has some bugs that can lead to segfaults Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.cpp activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/util/ServiceSupport.cpp activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/util/ServiceSupport.h Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp?rev=1338453&r1=1338452&r2=1338453&view=diff ============================================================================== --- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp (original) +++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp Mon May 14 22:06:45 2012 @@ -37,6 +37,7 @@ #include #include #include +#include #include #include #include @@ -159,8 +160,8 @@ namespace core{ DispatcherMap dispatchers; ProducerMap activeProducers; - decaf::util::concurrent::CopyOnWriteArrayList< Pointer > activeSessions; - decaf::util::concurrent::CopyOnWriteArrayList transportListeners; + decaf::util::LinkedList< Pointer > activeSessions; + decaf::util::LinkedList transportListeners; TempDestinationMap activeTempDestinations; @@ -303,12 +304,14 @@ namespace core{ // Clean up the Connection resources. this->connection->cleanup(); - Pointer< Iterator > iter( this->config->transportListeners.iterator() ); + synchronized(&this->config->transportListeners) { + Pointer< Iterator > iter( this->config->transportListeners.iterator() ); - while( iter->hasNext() ) { - try{ - iter->next()->onException(ex); - } catch(...) {} + while( iter->hasNext() ) { + try{ + iter->next()->onException(ex); + } catch(...) {} + } } } catch(Exception& ex) {} } @@ -528,20 +531,22 @@ void ActiveMQConnection::close() { AMQ_CATCH_ALL_THROW_CMSEXCEPTION() } - // Get the complete list of active sessions. - std::auto_ptr< Iterator > > iter(this->config->activeSessions.iterator()); - + // Get the complete list of active sessions and call dispose() which should not trigger + // any messages back to the broker. long long lastDeliveredSequenceId = 0; + synchronized(&this->config->activeSessions) { + std::auto_ptr< Iterator > > iter(this->config->activeSessions.iterator()); - // Dispose of all the Session resources we know are still open. - while (iter->hasNext()) { - Pointer session = iter->next(); - try{ - session->dispose(); - lastDeliveredSequenceId = - Math::max(lastDeliveredSequenceId, session->getLastDeliveredSequenceId()); - } catch( cms::CMSException& ex ){ - /* Absorb */ + // Dispose of all the Session resources we know are still open. + while (iter->hasNext()) { + Pointer session = iter->next(); + try{ + session->dispose(); + lastDeliveredSequenceId = + Math::max(lastDeliveredSequenceId, session->getLastDeliveredSequenceId()); + } catch( cms::CMSException& ex ){ + /* Absorb */ + } } } @@ -578,16 +583,18 @@ void ActiveMQConnection::cleanup() { try { - // Get the complete list of active sessions. - std::auto_ptr< Iterator< Pointer > > iter( this->config->activeSessions.iterator() ); + synchronized(&this->config->activeSessions) { + // Get the complete list of active sessions. + std::auto_ptr< Iterator< Pointer > > iter( this->config->activeSessions.iterator() ); - // Dispose of all the Session resources we know are still open. - while (iter->hasNext()) { - Pointer session = iter->next(); - try{ - session->dispose(); - } catch( cms::CMSException& ex ){ - /* Absorb */ + // Dispose of all the Session resources we know are still open. + while (iter->hasNext()) { + Pointer session = iter->next(); + try{ + session->dispose(); + } catch( cms::CMSException& ex ){ + /* Absorb */ + } } } @@ -870,12 +877,13 @@ void ActiveMQConnection::onCommand(const this->onConsumerControl(command); } - Pointer< Iterator > iter(this->config->transportListeners.iterator()); - - while (iter->hasNext()) { - try{ - iter->next()->onCommand(command); - } catch(...) {} + synchronized(&this->config->transportListeners) { + Pointer< Iterator > iter(this->config->transportListeners.iterator()); + while (iter->hasNext()) { + try{ + iter->next()->onCommand(command); + } catch(...) {} + } } } AMQ_CATCH_RETHROW( ActiveMQException ) @@ -898,15 +906,16 @@ void ActiveMQConnection::onConsumerContr Pointer consumerControl = command.dynamicCast(); - // Get the complete list of active sessions. - std::auto_ptr< Iterator< Pointer > > iter( this->config->activeSessions.iterator() ); + synchronized(&this->config->activeSessions) { + std::auto_ptr > > iter(this->config->activeSessions.iterator()); - while (iter->hasNext()) { - Pointer session = iter->next(); - if (consumerControl->isClose()) { - session->close(consumerControl->getConsumerId()); - } else { - session->setPrefetchSize(consumerControl->getConsumerId(), consumerControl->getPrefetch()); + while (iter->hasNext()) { + Pointer session = iter->next(); + if (consumerControl->isClose()) { + session->close(consumerControl->getConsumerId()); + } else { + session->setPrefetchSize(consumerControl->getConsumerId(), consumerControl->getPrefetch()); + } } } } @@ -957,30 +966,33 @@ void ActiveMQConnection::transportInterr this->config->transportInterruptionProcessingComplete.reset( new CountDownLatch( (int)this->config->dispatchers.size() ) ); - std::auto_ptr< Iterator< Pointer > > sessions(this->config->activeSessions.iterator()); - - while (sessions->hasNext()) { - sessions->next()->clearMessagesInProgress(); + synchronized(&this->config->activeSessions) { + std::auto_ptr< Iterator< Pointer > > sessions(this->config->activeSessions.iterator()); + while (sessions->hasNext()) { + sessions->next()->clearMessagesInProgress(); + } } - Pointer< Iterator > listeners(this->config->transportListeners.iterator()); - - while (listeners->hasNext()) { - try{ - listeners->next()->transportInterrupted(); - } catch(...) {} + synchronized(&this->config->transportListeners) { + Pointer< Iterator > listeners(this->config->transportListeners.iterator()); + while (listeners->hasNext()) { + try{ + listeners->next()->transportInterrupted(); + } catch(...) {} + } } } //////////////////////////////////////////////////////////////////////////////// void ActiveMQConnection::transportResumed() { - Pointer< Iterator > iter( this->config->transportListeners.iterator() ); - - while( iter->hasNext() ) { - try{ - iter->next()->transportResumed(); - } catch(...) {} + synchronized(&this->config->transportListeners) { + Pointer< Iterator > iter( this->config->transportListeners.iterator() ); + while( iter->hasNext() ) { + try{ + iter->next()->transportResumed(); + } catch(...) {} + } } } @@ -1120,7 +1132,9 @@ void ActiveMQConnection::addTransportLis } // Add this listener from the set of active TransportListeners - this->config->transportListeners.add(transportListener); + synchronized(&this->config->transportListeners) { + this->config->transportListeners.add(transportListener); + } } //////////////////////////////////////////////////////////////////////////////// @@ -1131,7 +1145,9 @@ void ActiveMQConnection::removeTransport } // Remove this listener from the set of active TransportListeners - this->config->transportListeners.remove(transportListener); + synchronized(&this->config->transportListeners) { + this->config->transportListeners.remove(transportListener); + } } //////////////////////////////////////////////////////////////////////////////// @@ -1438,11 +1454,13 @@ void ActiveMQConnection::deleteTempDesti checkClosedOrFailed(); ensureConnectionInfoSent(); - Pointer< Iterator< Pointer > > iterator(this->config->activeSessions.iterator()); - while (iterator->hasNext()) { - Pointer session = iterator->next(); - if (session->isInUse(destination)) { - throw ActiveMQException(__FILE__, __LINE__, "A consumer is consuming from the temporary destination"); + synchronized(&this->config->activeSessions) { + Pointer< Iterator< Pointer > > iterator(this->config->activeSessions.iterator()); + while (iterator->hasNext()) { + Pointer session = iterator->next(); + if (session->isInUse(destination)) { + throw ActiveMQException(__FILE__, __LINE__, "A consumer is consuming from the temporary destination"); + } } } Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h?rev=1338453&r1=1338452&r2=1338453&view=diff ============================================================================== --- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h (original) +++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h Mon May 14 22:06:45 2012 @@ -34,7 +34,6 @@ #include #include #include -#include #include #include #include @@ -47,7 +46,6 @@ namespace activemq{ namespace core{ using decaf::lang::Pointer; - using decaf::util::concurrent::atomic::AtomicBoolean; class ActiveMQSession; class ConnectionConfig; @@ -74,24 +72,24 @@ namespace core{ /** * Indicates if this Connection is started */ - AtomicBoolean started; + decaf::util::concurrent::atomic::AtomicBoolean started; /** * Indicates that this connection has been closed, it is no longer * usable after this becomes true */ - AtomicBoolean closed; + decaf::util::concurrent::atomic::AtomicBoolean closed; /** * Indicates that this connection has been closed, it is no longer * usable after this becomes true */ - AtomicBoolean closing; + decaf::util::concurrent::atomic::AtomicBoolean closing; /** * Indicates that this connection's Transport has failed. */ - AtomicBoolean transportFailed; + decaf::util::concurrent::atomic::AtomicBoolean transportFailed; private: Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.cpp URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.cpp?rev=1338453&r1=1338452&r2=1338453&view=diff ============================================================================== --- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.cpp (original) +++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.cpp Mon May 14 22:06:45 2012 @@ -56,6 +56,7 @@ #include #include #include +#include #include #include #include @@ -97,7 +98,7 @@ namespace kernels{ public: AtomicBoolean synchronizationRegistered; - decaf::util::concurrent::CopyOnWriteArrayList< Pointer > producers; + decaf::util::LinkedList< Pointer > producers; Pointer scheduler; Pointer closeSync; ConsumersMap consumers; @@ -1183,7 +1184,9 @@ void ActiveMQSessionKernel::addProducer( try { this->checkClosed(); - this->config->producers.add(producer); + synchronized(&this->config->producers) { + this->config->producers.add(producer); + } this->connection->addProducer(producer); } AMQ_CATCH_RETHROW( activemq::exceptions::ActiveMQException ) @@ -1196,7 +1199,9 @@ void ActiveMQSessionKernel::removeProduc try { this->connection->removeProducer(producer->getProducerId()); - this->config->producers.remove(producer); + synchronized(&this->config->producers) { + this->config->producers.remove(producer); + } } AMQ_CATCH_RETHROW( ActiveMQException ) AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException ) @@ -1206,12 +1211,13 @@ void ActiveMQSessionKernel::removeProduc //////////////////////////////////////////////////////////////////////////////// Pointer ActiveMQSessionKernel::lookupProducerKernel(Pointer id) { - std::auto_ptr > > producerIter(this->config->producers.iterator()); - - while (producerIter->hasNext()) { - Pointer producer = producerIter->next(); - if (producer->getProducerId()->equals(*id)) { - return producer; + synchronized(&this->config->producers) { + std::auto_ptr > > producerIter(this->config->producers.iterator()); + while (producerIter->hasNext()) { + Pointer producer = producerIter->next(); + if (producer->getProducerId()->equals(*id)) { + return producer; + } } } Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/util/ServiceSupport.cpp URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/util/ServiceSupport.cpp?rev=1338453&r1=1338452&r2=1338453&view=diff ============================================================================== --- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/util/ServiceSupport.cpp (original) +++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/util/ServiceSupport.cpp Mon May 14 22:06:45 2012 @@ -30,7 +30,7 @@ using namespace decaf::lang; using namespace decaf::util; //////////////////////////////////////////////////////////////////////////////// -ServiceSupport::ServiceSupport() : Service(), started(), stopping(), stopped(true), listemers() { +ServiceSupport::ServiceSupport() : Service(), started(), stopping(), stopped(true), listeners() { } //////////////////////////////////////////////////////////////////////////////// @@ -62,9 +62,11 @@ void ServiceSupport::start() { this->stopping.set(false); this->stopped.set(!success); - std::auto_ptr< Iterator > iter(this->listemers.iterator()); - while(iter->hasNext()) { - iter->next()->started(this); + synchronized(&this->listeners) { + std::auto_ptr< Iterator > iter(this->listeners.iterator()); + while(iter->hasNext()) { + iter->next()->started(this); + } } } } @@ -84,9 +86,11 @@ void ServiceSupport::stop() { this->started.set(false); this->stopping.set(false); - std::auto_ptr< Iterator > iter(this->listemers.iterator()); - while(iter->hasNext()) { - iter->next()->stopped(this); + synchronized(&this->listeners) { + std::auto_ptr< Iterator > iter(this->listeners.iterator()); + while(iter->hasNext()) { + iter->next()->stopped(this); + } } stopper.throwFirstException(); @@ -111,13 +115,17 @@ bool ServiceSupport::isStopped() const { //////////////////////////////////////////////////////////////////////////////// void ServiceSupport::addServiceListener(ServiceListener* listener) { if(listener != NULL) { - this->listemers.add(listener); + synchronized(&this->listeners) { + this->listeners.add(listener); + } } } //////////////////////////////////////////////////////////////////////////////// void ServiceSupport::removeServiceListener(ServiceListener* listener) { if(listener != NULL) { - this->listemers.remove(listener); + synchronized(&this->listeners) { + this->listeners.remove(listener); + } } } Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/util/ServiceSupport.h URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/util/ServiceSupport.h?rev=1338453&r1=1338452&r2=1338453&view=diff ============================================================================== --- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/util/ServiceSupport.h (original) +++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/util/ServiceSupport.h Mon May 14 22:06:45 2012 @@ -22,7 +22,7 @@ #include #include -#include +#include namespace activemq { namespace util { @@ -41,7 +41,7 @@ namespace util { decaf::util::concurrent::atomic::AtomicBoolean started; decaf::util::concurrent::atomic::AtomicBoolean stopping; decaf::util::concurrent::atomic::AtomicBoolean stopped; - decaf::util::concurrent::CopyOnWriteArrayList listemers; + decaf::util::ArrayList listeners; public: