Return-Path: Delivered-To: apmail-incubator-qpid-commits-archive@locus.apache.org Received: (qmail 23293 invoked from network); 3 Nov 2008 17:22:30 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 3 Nov 2008 17:22:30 -0000 Received: (qmail 83386 invoked by uid 500); 3 Nov 2008 17:22:37 -0000 Delivered-To: apmail-incubator-qpid-commits-archive@incubator.apache.org Received: (qmail 83376 invoked by uid 500); 3 Nov 2008 17:22:36 -0000 Mailing-List: contact qpid-commits-help@incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: qpid-dev@incubator.apache.org Delivered-To: mailing list qpid-commits@incubator.apache.org Received: (qmail 83366 invoked by uid 99); 3 Nov 2008 17:22:36 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 03 Nov 2008 09:22:36 -0800 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 03 Nov 2008 17:21:28 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 9F032238889E; Mon, 3 Nov 2008 09:21:39 -0800 (PST) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r710106 - in /incubator/qpid/trunk/qpid/cpp/src: qpid/ qpid/client/ tests/ Date: Mon, 03 Nov 2008 17:21:38 -0000 To: qpid-commits@incubator.apache.org From: gsim@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20081103172139.9F032238889E@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: gsim Date: Mon Nov 3 09:21:38 2008 New Revision: 710106 URL: http://svn.apache.org/viewvc?rev=710106&view=rev Log: Various fixes arising from testing client failover: * introduced new exception type for signalling connection failure (as distinct from any logical connection errors) * ConnectionImpl::closeInternal(): take copy of session map to prevent concurrent modification (by the same thread) as sessions are deleted and erase themselves. * ConnectionImpl::shutdown: hold lock before calling closeInternal(); mark handler failed before informing sessions of failure * SessionImpl::connectionBroker(): remove code as its rather meaningless * Don't swallow exceptions in Dispatcher * Handle exceptions in FailoverListener * Take weak_ptr to ConnectionImpl on constructor of Connector, then convert to shared_ptr when 'receiver' thread is started. Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/Exception.h incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionHandler.cpp incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp incubator/qpid/trunk/qpid/cpp/src/qpid/client/Dispatcher.cpp incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverListener.cpp incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverListener.h incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.cpp incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.h incubator/qpid/trunk/qpid/cpp/src/tests/exception_test.cpp Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/Exception.h URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/Exception.h?rev=710106&r1=710105&r2=710106&view=diff ============================================================================== --- incubator/qpid/trunk/qpid/cpp/src/qpid/Exception.h (original) +++ incubator/qpid/trunk/qpid/cpp/src/qpid/Exception.h Mon Nov 3 09:21:38 2008 @@ -80,6 +80,13 @@ std::string getPrefix() const; }; +/** + * Exception representing transport failure + */ +struct TransportFailure : public Exception { + TransportFailure(const std::string& msg=std::string()) : Exception(msg) {} +}; + } // namespace qpid #endif /*!_Exception_*/ Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionHandler.cpp URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionHandler.cpp?rev=710106&r1=710105&r2=710106&view=diff ============================================================================== --- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionHandler.cpp (original) +++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionHandler.cpp Mon Nov 3 09:21:38 2008 @@ -89,7 +89,7 @@ if (getState() == OPEN) out(frame); else - throw Exception(errorText.empty() ? "Connection is not open." : errorText); + throw TransportFailure(errorText.empty() ? "Connection is not open." : errorText); } void ConnectionHandler::waitForOpen() Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp?rev=710106&r1=710105&r2=710106&view=diff ============================================================================== --- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp (original) +++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp Mon Nov 3 09:21:38 2008 @@ -129,17 +129,22 @@ { if (!handler.isOpen()) return; handler.close(); - closed(CLOSE_CODE_NORMAL, "Closed by client"); } template void ConnectionImpl::closeInternal(const F& f) { connector->close(); - for (SessionMap::iterator i = sessions.begin(); i != sessions.end(); ++i) { + //notifying sessions of failure can result in those session being + //deleted which in turn results in a call to erase(); this can + //even happen on this thread, when 's' goes out of scope + //below. Using a copy prevents the map being modified as we + //iterate through. + SessionMap copy; + sessions.swap(copy); + for (SessionMap::iterator i = copy.begin(); i != copy.end(); ++i) { boost::shared_ptr s = i->second.lock(); if (s) f(s); } - sessions.clear(); } void ConnectionImpl::closed(uint16_t code, const std::string& text) { @@ -148,7 +153,7 @@ closeInternal(boost::bind(&SessionImpl::connectionClosed, _1, code, text)); } -static const std::string CONN_CLOSED("Connection closed by broker"); +static const std::string CONN_CLOSED("Connection closed"); void ConnectionImpl::shutdown() { if ( failureCallback ) @@ -158,10 +163,12 @@ // FIXME aconway 2008-06-06: exception use, amqp0-10 does not seem to have // an appropriate close-code. connection-forced is not right. - if (!handler.isClosing()) - closeInternal(boost::bind(&SessionImpl::connectionBroke, _1, CLOSE_CODE_CONNECTION_FORCED, CONN_CLOSED)); - setException(new ConnectionException(CLOSE_CODE_CONNECTION_FORCED, CONN_CLOSED)); - handler.fail(CONN_CLOSED); + bool isClosing = handler.isClosing(); + handler.fail(CONN_CLOSED);//ensure connection is marked as failed before notifying sessions + Mutex::ScopedLock l(lock); + if (!isClosing) + closeInternal(boost::bind(&SessionImpl::connectionBroke, _1, CONN_CLOSED)); + setException(new TransportFailure(CONN_CLOSED)); } void ConnectionImpl::erase(uint16_t ch) { Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp?rev=710106&r1=710105&r2=710106&view=diff ============================================================================== --- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp (original) +++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp Mon Nov 3 09:21:38 2008 @@ -35,6 +35,7 @@ #include #include #include +#include namespace qpid { namespace client { @@ -140,7 +141,7 @@ std::string identifier; - ConnectionImpl* impl; + boost::weak_ptr impl; void connect(const std::string& host, int port); void init(); @@ -183,7 +184,7 @@ shutdownHandler(0), writer(maxFrameSize, cimpl), aio(0), - impl(cimpl) + impl(cimpl->shared_from_this()) { QPID_LOG(debug, "TCPConnector created for " << version.toString()); settings.configureSocket(socket); @@ -380,7 +381,7 @@ // will never be called void TCPConnector::run(){ // Keep the connection impl in memory until run() completes. - boost::shared_ptr protect = impl->shared_from_this(); + boost::shared_ptr protect = impl.lock(); assert(protect); try { Dispatcher d(poller); Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Dispatcher.cpp URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Dispatcher.cpp?rev=710106&r1=710105&r2=710106&view=diff ============================================================================== --- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Dispatcher.cpp (original) +++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Dispatcher.cpp Mon Nov 3 09:21:38 2008 @@ -91,9 +91,9 @@ if ( failoverHandler ) { QPID_LOG(debug, QPID_MSG(session.getId() << " failover: " << e.what())); failoverHandler(); - } - else { + } else { QPID_LOG(error, session.getId() << " error: " << e.what()); + throw; } } } Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverListener.cpp URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverListener.cpp?rev=710106&r1=710105&r2=710106&view=diff ============================================================================== --- incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverListener.cpp (original) +++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverListener.cpp Mon Nov 3 09:21:38 2008 @@ -61,7 +61,17 @@ session.queueDeclare(arg::queue=qname, arg::exclusive=true, arg::autoDelete=true); session.exchangeBind(arg::queue=qname, arg::exchange=AMQ_FAILOVER); subscriptions->subscribe(*this, qname, SubscriptionSettings(FlowControl::unlimited(), ACCEPT_MODE_NONE)); - thread = sys::Thread(*subscriptions); + thread = sys::Thread(*this); +} + +void FailoverListener::run() +{ + try { + subscriptions->run(); + } catch (const TransportFailure&) { + } catch (const std::exception& e) { + QPID_LOG(error, QPID_MSG(e.what())); + } } FailoverListener::~FailoverListener() { Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverListener.h URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverListener.h?rev=710106&r1=710105&r2=710106&view=diff ============================================================================== --- incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverListener.h (original) +++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverListener.h Mon Nov 3 09:21:38 2008 @@ -25,6 +25,7 @@ #include "qpid/client/MessageListener.h" #include "qpid/Url.h" #include "qpid/sys/Mutex.h" +#include "qpid/sys/Runnable.h" #include "qpid/sys/Thread.h" #include @@ -36,7 +37,8 @@ /** * @internal Listen for failover updates from the amq.failover exchange. */ -class FailoverListener : public MessageListener { +class FailoverListener : public MessageListener, private qpid::sys::Runnable +{ public: FailoverListener(const boost::shared_ptr&, const std::vector& initUrls); ~FailoverListener(); @@ -44,6 +46,7 @@ std::vector getKnownBrokers() const; void received(Message& msg); + void run(); private: mutable sys::Mutex lock; Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.cpp URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.cpp?rev=710106&r1=710105&r2=710106&view=diff ============================================================================== --- incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.cpp (original) +++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.cpp Mon Nov 3 09:21:38 2008 @@ -260,8 +260,9 @@ * Called by ConnectionImpl to notify active sessions when connection * is disconnected */ -void SessionImpl::connectionBroke(uint16_t _code, const std::string& _text) { - connectionClosed(_code, _text); +void SessionImpl::connectionBroke(const std::string& _text) { + setException(sys::ExceptionHolder(new TransportFailure(_text))); + handleClosed(); } Future SessionImpl::send(const AMQBody& command) Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.h URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.h?rev=710106&r1=710105&r2=710106&view=diff ============================================================================== --- incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.h (original) +++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.h Mon Nov 3 09:21:38 2008 @@ -99,7 +99,7 @@ //NOTE: these are called by the network thread when the connection is closed or dies void connectionClosed(uint16_t code, const std::string& text); - void connectionBroke(uint16_t code, const std::string& text); + void connectionBroke(const std::string& text); /** Set timeout in seconds, returns actual timeout allowed by broker */ uint32_t setTimeout(uint32_t requestedSeconds); Modified: incubator/qpid/trunk/qpid/cpp/src/tests/exception_test.cpp URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/exception_test.cpp?rev=710106&r1=710105&r2=710106&view=diff ============================================================================== --- incubator/qpid/trunk/qpid/cpp/src/tests/exception_test.cpp (original) +++ incubator/qpid/trunk/qpid/cpp/src/tests/exception_test.cpp Mon Nov 3 09:21:38 2008 @@ -92,7 +92,7 @@ ProxyConnection c(fix.broker->getPort(Broker::TCP_TRANSPORT)); fix.session.queueDeclare(arg::queue="q"); fix.subs.subscribe(fix.lq, "q"); - Catcher pop(bind(&LocalQueue::pop, &fix.lq, sys::TIME_SEC)); + Catcher pop(bind(&LocalQueue::pop, &fix.lq, sys::TIME_SEC)); fix.connection.proxy.close(); BOOST_CHECK(pop.join()); } @@ -106,11 +106,10 @@ fix.session.queueDeclare(arg::queue="q"); fix.subs.subscribe(l, "q"); - ScopedSuppressLogging sl; // Suppress messages for expected errors. - Thread t(fix.subs); + Catcher runner(bind(&SubscriptionManager::run, boost::ref(fix.subs))); fix.connection.proxy.close(); - t.join(); - BOOST_CHECK_THROW(fix.session.close(), ConnectionException); + runner.join(); + BOOST_CHECK_THROW(fix.session.close(), TransportFailure); } QPID_AUTO_TEST_CASE(NoSuchQueueTest) {