Return-Path: X-Original-To: apmail-activemq-commits-archive@www.apache.org Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id C73879245 for ; Wed, 24 Oct 2012 20:09:45 +0000 (UTC) Received: (qmail 29053 invoked by uid 500); 24 Oct 2012 20:09:45 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 28991 invoked by uid 500); 24 Oct 2012 20:09:45 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 28984 invoked by uid 99); 24 Oct 2012 20:09:45 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 24 Oct 2012 20:09:45 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 24 Oct 2012 20:09:43 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 6B4B523888E3 for ; Wed, 24 Oct 2012 20:09:00 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1401852 - /activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp Date: Wed, 24 Oct 2012 20:09:00 -0000 To: commits@activemq.apache.org From: tabish@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20121024200900.6B4B523888E3@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: tabish Date: Wed Oct 24 20:08:59 2012 New Revision: 1401852 URL: http://svn.apache.org/viewvc?rev=1401852&view=rev Log: fix for: https://issues.apache.org/jira/browse/AMQCPP-405 Account for advisory consumer in interruption processing. Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp?rev=1401852&r1=1401851&r2=1401852&view=diff ============================================================================== --- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp (original) +++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp Wed Oct 24 20:08:59 2012 @@ -408,12 +408,8 @@ namespace core{ //////////////////////////////////////////////////////////////////////////////// ActiveMQConnection::ActiveMQConnection(const Pointer transport, const Pointer properties) : - config(NULL), - connectionMetaData(new ActiveMQConnectionMetaData()), - started(false), - closed(false), - closing(false), - transportFailed(false) { + config(NULL), connectionMetaData(new ActiveMQConnectionMetaData()), started(false), + closed(false), closing(false), transportFailed(false) { Pointer configuration( new ConnectionConfig(transport, properties)); @@ -432,28 +428,26 @@ ActiveMQConnection::ActiveMQConnection(c ActiveMQConnection::~ActiveMQConnection() { try { - try{ + try { this->close(); - } catch(...) {} + } + AMQ_CATCHALL_NOTHROW() // This must happen even if exceptions occur in the Close attempt. delete this->config; } - AMQ_CATCH_NOTHROW( ActiveMQException ) - AMQ_CATCHALL_NOTHROW( ) + AMQ_CATCHALL_NOTHROW() } //////////////////////////////////////////////////////////////////////////////// -void ActiveMQConnection::addDispatcher( - const decaf::lang::Pointer& consumer, Dispatcher* dispatcher) { +void ActiveMQConnection::addDispatcher(const decaf::lang::Pointer& consumer, Dispatcher* dispatcher) { - try{ + try { synchronized(&this->config->dispatchers) { this->config->dispatchers.put(consumer, dispatcher); } } - AMQ_CATCH_ALL_THROW_CMSEXCEPTION() -} + AMQ_CATCH_ALL_THROW_CMSEXCEPTION()} //////////////////////////////////////////////////////////////////////////////// void ActiveMQConnection::removeDispatcher(const decaf::lang::Pointer& consumer) { @@ -463,8 +457,7 @@ void ActiveMQConnection::removeDispatche this->config->dispatchers.remove(consumer); } } - AMQ_CATCH_ALL_THROW_CMSEXCEPTION() -} + AMQ_CATCH_ALL_THROW_CMSEXCEPTION()} //////////////////////////////////////////////////////////////////////////////// cms::Session* ActiveMQConnection::createSession() { @@ -485,8 +478,7 @@ cms::Session* ActiveMQConnection::create // Create the session instance as a Session Kernel we then create and return a // ActiveMQSession instance that acts as a proxy to the kernel caller can delete // that at any time since we only refer to the Pointer to the session kernel. - Pointer session(new ActiveMQSessionKernel( - this, getNextSessionId(), ackMode, *this->config->properties)); + Pointer session(new ActiveMQSessionKernel(this, getNextSessionId(), ackMode, *this->config->properties)); session->setMessageTransformer(this->config->transformer); @@ -570,7 +562,7 @@ std::string ActiveMQConnection::getClien } //////////////////////////////////////////////////////////////////////////////// -void ActiveMQConnection::setClientID( const std::string& clientID ) { +void ActiveMQConnection::setClientID(const std::string& clientID) { if (this->closed.get()) { throw cms::IllegalStateException("Connection is already closed", NULL); @@ -666,7 +658,7 @@ void ActiveMQConnection::close() { this->config->activeSessions.clear(); this->config->sessionsLock.writeLock().unlock(); - } catch(Exception& error) { + } catch (Exception& error) { this->config->sessionsLock.writeLock().unlock(); if (!hasException) { ex = error; @@ -677,8 +669,7 @@ void ActiveMQConnection::close() { // As TemporaryQueue and TemporaryTopic instances are bound to a connection // we should just delete them after the connection is closed to free up memory - Pointer< Iterator< Pointer< ActiveMQTempDestination> > > iterator( - this->config->activeTempDestinations.values().iterator()); + Pointer > > iterator(this->config->activeTempDestinations.values().iterator()); try { while (iterator->hasNext()) { @@ -761,14 +752,14 @@ void ActiveMQConnection::cleanup() { try { // We need to use a copy since we aren't able to use CopyOnWriteArrayList ArrayList > sessions(this->config->activeSessions); - std::auto_ptr< Iterator< Pointer > > iter(sessions.iterator()); + std::auto_ptr > > iter(sessions.iterator()); // Dispose of all the Session resources we know are still open. while (iter->hasNext()) { Pointer session = iter->next(); - try{ + try { session->dispose(); - } catch( cms::CMSException& ex ){ + } catch (cms::CMSException& ex) { /* Absorb */ } } @@ -813,7 +804,7 @@ void ActiveMQConnection::start() { this->config->sessionsLock.readLock().lock(); // Start all the sessions. - std::auto_ptr > > iter(this->config->activeSessions.iterator()); + std::auto_ptr > > iter(this->config->activeSessions.iterator()); while (iter->hasNext()) { iter->next()->start(); } @@ -840,7 +831,7 @@ void ActiveMQConnection::stop() { // new messages. if (this->started.compareAndSet(true, false)) { this->config->sessionsLock.readLock().lock(); - std::auto_ptr > > iter(this->config->activeSessions.iterator()); + std::auto_ptr > > iter(this->config->activeSessions.iterator()); while (iter->hasNext()) { iter->next()->stop(); @@ -916,9 +907,9 @@ void ActiveMQConnection::disconnect(long throw e; } } - AMQ_CATCH_RETHROW( ActiveMQException ) - AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException ) - AMQ_CATCHALL_THROW( ActiveMQException ) + AMQ_CATCH_RETHROW(ActiveMQException) + AMQ_CATCH_EXCEPTION_CONVERT(Exception, ActiveMQException) + AMQ_CATCHALL_THROW(ActiveMQException) } //////////////////////////////////////////////////////////////////////////////// @@ -926,7 +917,7 @@ void ActiveMQConnection::sendPullRequest try { - if (consumer->getPrefetchSize() == 0) { + if (consumer->getPrefetchSize() == 0) { Pointer messagePull(new MessagePull()); messagePull->setConsumerId(consumer->getConsumerId()); @@ -936,9 +927,9 @@ void ActiveMQConnection::sendPullRequest this->oneway(messagePull); } } - AMQ_CATCH_RETHROW( ActiveMQException ) - AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException ) - AMQ_CATCHALL_THROW( ActiveMQException ) + AMQ_CATCH_RETHROW(ActiveMQException) + AMQ_CATCH_EXCEPTION_CONVERT(Exception, ActiveMQException) + AMQ_CATCHALL_THROW(ActiveMQException) } //////////////////////////////////////////////////////////////////////////////// @@ -957,16 +948,16 @@ void ActiveMQConnection::destroyDestinat command->setConnectionId(this->config->connectionInfo->getConnectionId()); command->setOperationType(ActiveMQConstants::DESTINATION_REMOVE_OPERATION); - command->setDestination(Pointer (destination->cloneDataStructure())); + command->setDestination(Pointer(destination->cloneDataStructure())); // Send the message to the broker. syncRequest(command); } - AMQ_CATCH_RETHROW( NullPointerException ) - AMQ_CATCH_RETHROW( decaf::lang::exceptions::IllegalStateException ) - AMQ_CATCH_RETHROW( ActiveMQException ) - AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException ) - AMQ_CATCHALL_THROW( ActiveMQException ) + AMQ_CATCH_RETHROW(NullPointerException) + AMQ_CATCH_RETHROW(decaf::lang::exceptions::IllegalStateException) + AMQ_CATCH_RETHROW(ActiveMQException) + AMQ_CATCH_EXCEPTION_CONVERT(Exception, ActiveMQException) + AMQ_CATCHALL_THROW(ActiveMQException) } //////////////////////////////////////////////////////////////////////////////// @@ -981,16 +972,15 @@ void ActiveMQConnection::destroyDestinat checkClosedOrFailed(); ensureConnectionInfoSent(); - const ActiveMQDestination* amqDestination = - dynamic_cast (destination); + const ActiveMQDestination* amqDestination = dynamic_cast(destination); this->destroyDestination(amqDestination); } - AMQ_CATCH_RETHROW( NullPointerException ) - AMQ_CATCH_RETHROW( decaf::lang::exceptions::IllegalStateException ) - AMQ_CATCH_RETHROW( ActiveMQException ) - AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException ) - AMQ_CATCHALL_THROW( ActiveMQException ) + AMQ_CATCH_RETHROW(NullPointerException) + AMQ_CATCH_RETHROW(decaf::lang::exceptions::IllegalStateException) + AMQ_CATCH_RETHROW(ActiveMQException) + AMQ_CATCH_EXCEPTION_CONVERT(Exception, ActiveMQException) + AMQ_CATCHALL_THROW(ActiveMQException) } //////////////////////////////////////////////////////////////////////////////// @@ -1031,7 +1021,7 @@ void ActiveMQConnection::onCommand(const } else if (command->isProducerAck()) { - ProducerAck* producerAck = dynamic_cast( command.get() ); + ProducerAck* producerAck = dynamic_cast(command.get()); // Get the consumer info object for this consumer. Pointer producer; @@ -1061,17 +1051,18 @@ void ActiveMQConnection::onCommand(const } synchronized(&this->config->transportListeners) { - Pointer< Iterator > iter(this->config->transportListeners.iterator()); + Pointer > iter(this->config->transportListeners.iterator()); while (iter->hasNext()) { - try{ + try { iter->next()->onCommand(command); - } catch(...) {} + } catch (...) { + } } } } - AMQ_CATCH_RETHROW( ActiveMQException ) - AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException ) - AMQ_CATCHALL_THROW( ActiveMQException ) + AMQ_CATCH_RETHROW(ActiveMQException) + AMQ_CATCH_EXCEPTION_CONVERT(Exception, ActiveMQException) + AMQ_CATCHALL_THROW(ActiveMQException) } //////////////////////////////////////////////////////////////////////////////// @@ -1092,7 +1083,7 @@ void ActiveMQConnection::onConsumerContr this->config->sessionsLock.readLock().lock(); try { // Get the complete list of active sessions. - std::auto_ptr< Iterator< Pointer > > iter( this->config->activeSessions.iterator() ); + std::auto_ptr > > iter(this->config->activeSessions.iterator()); while (iter->hasNext()) { Pointer session = iter->next(); @@ -1110,7 +1101,7 @@ void ActiveMQConnection::onConsumerContr } //////////////////////////////////////////////////////////////////////////////// -void ActiveMQConnection::onException( const decaf::lang::Exception& ex ) { +void ActiveMQConnection::onException(const decaf::lang::Exception& ex) { try { @@ -1121,8 +1112,8 @@ void ActiveMQConnection::onException( co this->config->executor->execute(new OnExceptionRunnable(this, config, ex.clone())); } } - AMQ_CATCH_RETHROW( ActiveMQException ) - AMQ_CATCHALL_THROW( ActiveMQException ) + AMQ_CATCH_RETHROW(ActiveMQException) + AMQ_CATCHALL_THROW(ActiveMQException) } //////////////////////////////////////////////////////////////////////////////// @@ -1139,7 +1130,7 @@ void ActiveMQConnection::onAsyncExceptio //////////////////////////////////////////////////////////////////////////////// void ActiveMQConnection::onClientInternalException(const decaf::lang::Exception& ex) { - if ( !closed.get() && !closing.get() ) { + if (!closed.get() && !closing.get()) { if (this->config->exceptionListener != NULL) { this->config->executor->execute(new OnAsyncExceptionRunnable(this, ex)); @@ -1152,12 +1143,13 @@ void ActiveMQConnection::onClientInterna //////////////////////////////////////////////////////////////////////////////// void ActiveMQConnection::transportInterrupted() { - this->config->transportInterruptionProcessingComplete.reset( - new CountDownLatch( (int)this->config->dispatchers.size() ) ); + int consumers = this->config->watchTopicAdvisories ? (int) this->config->dispatchers.size() - 1 : (int) this->config->dispatchers.size(); + + this->config->transportInterruptionProcessingComplete.reset(new CountDownLatch(consumers)); this->config->sessionsLock.readLock().lock(); try { - std::auto_ptr< Iterator< Pointer > > sessions(this->config->activeSessions.iterator()); + std::auto_ptr > > sessions(this->config->activeSessions.iterator()); while (sessions->hasNext()) { sessions->next()->clearMessagesInProgress(); } @@ -1168,11 +1160,12 @@ void ActiveMQConnection::transportInterr } synchronized(&this->config->transportListeners) { - Pointer< Iterator > listeners(this->config->transportListeners.iterator()); + Pointer > listeners(this->config->transportListeners.iterator()); while (listeners->hasNext()) { - try{ + try { listeners->next()->transportInterrupted(); - } catch(...) {} + } catch (...) { + } } } } @@ -1181,11 +1174,12 @@ void ActiveMQConnection::transportInterr void ActiveMQConnection::transportResumed() { synchronized(&this->config->transportListeners) { - Pointer< Iterator > iter( this->config->transportListeners.iterator() ); - while( iter->hasNext() ) { - try{ + Pointer > iter(this->config->transportListeners.iterator()); + while (iter->hasNext()) { + try { iter->next()->transportResumed(); - } catch(...) {} + } catch (...) { + } } } } @@ -1197,10 +1191,10 @@ void ActiveMQConnection::oneway(Pointer< checkClosedOrFailed(); this->config->transport->oneway(command); } - AMQ_CATCH_EXCEPTION_CONVERT( IOException, ActiveMQException ) - AMQ_CATCH_EXCEPTION_CONVERT( decaf::lang::exceptions::UnsupportedOperationException, ActiveMQException ) - AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException ) - AMQ_CATCHALL_THROW( ActiveMQException ) + AMQ_CATCH_EXCEPTION_CONVERT(IOException, ActiveMQException) + AMQ_CATCH_EXCEPTION_CONVERT(decaf::lang::exceptions::UnsupportedOperationException, ActiveMQException) + AMQ_CATCH_EXCEPTION_CONVERT(Exception, ActiveMQException) + AMQ_CATCHALL_THROW(ActiveMQException) } //////////////////////////////////////////////////////////////////////////////// @@ -1218,8 +1212,7 @@ Pointer ActiveMQConnection::sy response = this->config->transport->request(command, timeout); } - commands::ExceptionResponse* exceptionResponse = - dynamic_cast (response.get()); + commands::ExceptionResponse* exceptionResponse = dynamic_cast(response.get()); if (exceptionResponse != NULL) { @@ -1267,9 +1260,7 @@ void ActiveMQConnection::asyncRequest(Po //////////////////////////////////////////////////////////////////////////////// void ActiveMQConnection::checkClosed() const { if (this->isClosed()) { - throw ActiveMQException( - __FILE__, __LINE__, - "ActiveMQConnection::enforceConnected - Connection has already been closed!" ); + throw ActiveMQException(__FILE__, __LINE__, "ActiveMQConnection::enforceConnected - Connection has already been closed!"); } } @@ -1278,7 +1269,7 @@ void ActiveMQConnection::checkClosedOrFa checkClosed(); if (this->transportFailed.get() == true) { - throw ConnectionFailedException( *this->config->firstFailureError ); + throw ConnectionFailedException(*this->config->firstFailureError); } } @@ -1316,9 +1307,9 @@ void ActiveMQConnection::ensureConnectio } } } - AMQ_CATCH_RETHROW( ActiveMQException ) - AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException ) - AMQ_CATCHALL_THROW( ActiveMQException ) + AMQ_CATCH_RETHROW(ActiveMQException) + AMQ_CATCH_EXCEPTION_CONVERT(Exception, ActiveMQException) + AMQ_CATCHALL_THROW(ActiveMQException) } //////////////////////////////////////////////////////////////////////////////// @@ -1405,19 +1396,17 @@ void ActiveMQConnection::signalInterrupt if (cdl->getCount() == 0) { this->config->transportInterruptionProcessingComplete.reset(NULL); - FailoverTransport* failoverTransport = - dynamic_cast(this->config->transport->narrow(typeid(FailoverTransport))); + FailoverTransport* failoverTransport = dynamic_cast(this->config->transport->narrow(typeid(FailoverTransport))); if (failoverTransport != NULL) { - failoverTransport->setConnectionInterruptProcessingComplete( - this->config->connectionInfo->getConnectionId()); + failoverTransport->setConnectionInterruptProcessingComplete(this->config->connectionInfo->getConnectionId()); } } } //////////////////////////////////////////////////////////////////////////////// void ActiveMQConnection::setUsername(const std::string& username) { - this->config->connectionInfo->setUserName( username ); + this->config->connectionInfo->setUserName(username); } //////////////////////////////////////////////////////////////////////////////// @@ -1427,7 +1416,7 @@ const std::string& ActiveMQConnection::g //////////////////////////////////////////////////////////////////////////////// void ActiveMQConnection::setPassword(const std::string& password) { - this->config->connectionInfo->setPassword( password ); + this->config->connectionInfo->setPassword(password); } //////////////////////////////////////////////////////////////////////////////// @@ -1491,7 +1480,7 @@ bool ActiveMQConnection::isDispatchAsync } //////////////////////////////////////////////////////////////////////////////// -void ActiveMQConnection::setDispatchAsync( bool value ) { +void ActiveMQConnection::setDispatchAsync(bool value) { this->config->dispatchAsync = value; } @@ -1675,7 +1664,7 @@ void ActiveMQConnection::deleteTempDesti this->config->sessionsLock.readLock().lock(); try { - Pointer< Iterator< Pointer > > iterator(this->config->activeSessions.iterator()); + Pointer > > iterator(this->config->activeSessions.iterator()); while (iterator->hasNext()) { Pointer session = iterator->next(); if (session->isInUse(destination)) { @@ -1695,16 +1684,16 @@ void ActiveMQConnection::deleteTempDesti command->setConnectionId(this->config->connectionInfo->getConnectionId()); command->setOperationType(ActiveMQConstants::DESTINATION_REMOVE_OPERATION); - command->setDestination(Pointer (destination->cloneDataStructure())); + command->setDestination(Pointer(destination->cloneDataStructure())); // Send the message to the broker. syncRequest(command); } - AMQ_CATCH_RETHROW( NullPointerException ) - AMQ_CATCH_RETHROW( decaf::lang::exceptions::IllegalStateException ) - AMQ_CATCH_RETHROW( ActiveMQException ) - AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException ) - AMQ_CATCHALL_THROW( ActiveMQException ) + AMQ_CATCH_RETHROW(NullPointerException) + AMQ_CATCH_RETHROW(decaf::lang::exceptions::IllegalStateException) + AMQ_CATCH_RETHROW(ActiveMQException) + AMQ_CATCH_EXCEPTION_CONVERT(Exception, ActiveMQException) + AMQ_CATCHALL_THROW(ActiveMQException) } //////////////////////////////////////////////////////////////////////////////// @@ -1714,8 +1703,7 @@ void ActiveMQConnection::cleanUpTempDest return; } - Pointer< Iterator< Pointer< ActiveMQTempDestination> > > iterator( - this->config->activeTempDestinations.values().iterator()); + Pointer > > iterator(this->config->activeTempDestinations.values().iterator()); while (iterator->hasNext()) { Pointer dest = iterator->next(); @@ -1724,13 +1712,13 @@ void ActiveMQConnection::cleanUpTempDest // Only delete this temporary destination if it was created from this connection, since the // advisory consumer tracks all temporary destinations there can be others in our mapping that // this connection did not create. - std::string thisConnectionId = this->config->connectionInfo->getConnectionId() != NULL ? - this->config->connectionInfo->getConnectionId()->toString() : ""; + std::string thisConnectionId = + this->config->connectionInfo->getConnectionId() != NULL ? this->config->connectionInfo->getConnectionId()->toString() : ""; if (dest->getConnectionId() == thisConnectionId) { this->deleteTempDestination(dest); } - } catch(Exception& ex) { + } catch (Exception& ex) { } } }