qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From g...@apache.org
Subject svn commit: r710106 - in /incubator/qpid/trunk/qpid/cpp/src: qpid/ qpid/client/ tests/
Date Mon, 03 Nov 2008 17:21:38 GMT
Author: gsim
Date: Mon Nov  3 09:21:38 2008
New Revision: 710106

URL: http://svn.apache.org/viewvc?rev=710106&view=rev
Log:
Various fixes arising from testing client failover:

* introduced new exception type for signalling connection failure (as distinct from any logical
connection errors)

* ConnectionImpl::closeInternal(): take copy of session map to prevent concurrent modification
(by the same thread) as sessions are deleted and erase themselves.

* ConnectionImpl::shutdown: hold lock before calling closeInternal(); mark handler failed
before informing sessions of failure

* SessionImpl::connectionBroker(): remove code as its rather meaningless

* Don't swallow exceptions in Dispatcher

* Handle exceptions in FailoverListener

* Take weak_ptr to ConnectionImpl on constructor of Connector, then convert to shared_ptr
when 'receiver' thread is started.


Modified:
    incubator/qpid/trunk/qpid/cpp/src/qpid/Exception.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionHandler.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/Dispatcher.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverListener.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverListener.h
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.cpp
    incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.h
    incubator/qpid/trunk/qpid/cpp/src/tests/exception_test.cpp

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/Exception.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/Exception.h?rev=710106&r1=710105&r2=710106&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/Exception.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/Exception.h Mon Nov  3 09:21:38 2008
@@ -80,6 +80,13 @@
     std::string getPrefix() const;
 };
 
+/**
+ * Exception representing transport failure
+ */
+struct TransportFailure : public Exception {
+    TransportFailure(const std::string& msg=std::string()) : Exception(msg) {}
+};
+
 } // namespace qpid
  
 #endif  /*!_Exception_*/

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionHandler.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionHandler.cpp?rev=710106&r1=710105&r2=710106&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionHandler.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionHandler.cpp Mon Nov  3 09:21:38
2008
@@ -89,7 +89,7 @@
     if (getState() == OPEN) 
         out(frame);
     else
-        throw Exception(errorText.empty() ? "Connection is not open." : errorText);
+        throw TransportFailure(errorText.empty() ? "Connection is not open." : errorText);
 }
 
 void ConnectionHandler::waitForOpen()

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp?rev=710106&r1=710105&r2=710106&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp Mon Nov  3 09:21:38 2008
@@ -129,17 +129,22 @@
 {
     if (!handler.isOpen()) return;
     handler.close();
-    closed(CLOSE_CODE_NORMAL, "Closed by client");
 }
 
 
 template <class F> void ConnectionImpl::closeInternal(const F& f) {
     connector->close();
-    for (SessionMap::iterator i = sessions.begin(); i != sessions.end(); ++i) {
+    //notifying sessions of failure can result in those session being
+    //deleted which in turn results in a call to erase(); this can
+    //even happen on this thread, when 's' goes out of scope
+    //below. Using a copy prevents the map being modified as we
+    //iterate through.
+    SessionMap copy;
+    sessions.swap(copy);
+    for (SessionMap::iterator i = copy.begin(); i != copy.end(); ++i) {
         boost::shared_ptr<SessionImpl> s = i->second.lock();
         if (s) f(s);
     }
-    sessions.clear();
 }
 
 void ConnectionImpl::closed(uint16_t code, const std::string& text) { 
@@ -148,7 +153,7 @@
     closeInternal(boost::bind(&SessionImpl::connectionClosed, _1, code, text));
 }
 
-static const std::string CONN_CLOSED("Connection closed by broker");
+static const std::string CONN_CLOSED("Connection closed");
 
 void ConnectionImpl::shutdown() {
     if ( failureCallback )
@@ -158,10 +163,12 @@
 
     // FIXME aconway 2008-06-06: exception use, amqp0-10 does not seem to have
     // an appropriate close-code. connection-forced is not right.
-    if (!handler.isClosing()) 
-        closeInternal(boost::bind(&SessionImpl::connectionBroke, _1, CLOSE_CODE_CONNECTION_FORCED,
CONN_CLOSED));
-    setException(new ConnectionException(CLOSE_CODE_CONNECTION_FORCED, CONN_CLOSED));
-    handler.fail(CONN_CLOSED);
+    bool isClosing = handler.isClosing();
+    handler.fail(CONN_CLOSED);//ensure connection is marked as failed before notifying sessions
+    Mutex::ScopedLock l(lock);
+    if (!isClosing) 
+        closeInternal(boost::bind(&SessionImpl::connectionBroke, _1, CONN_CLOSED));
+    setException(new TransportFailure(CONN_CLOSED));
 }
 
 void ConnectionImpl::erase(uint16_t ch) {

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp?rev=710106&r1=710105&r2=710106&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Connector.cpp Mon Nov  3 09:21:38 2008
@@ -35,6 +35,7 @@
 #include <map>
 #include <boost/bind.hpp>
 #include <boost/format.hpp>
+#include <boost/weak_ptr.hpp>
 
 namespace qpid {
 namespace client {
@@ -140,7 +141,7 @@
 
     std::string identifier;
 
-    ConnectionImpl* impl;
+    boost::weak_ptr<ConnectionImpl> impl;
     
     void connect(const std::string& host, int port);
     void init();
@@ -183,7 +184,7 @@
       shutdownHandler(0),
       writer(maxFrameSize, cimpl),
       aio(0),
-      impl(cimpl)
+      impl(cimpl->shared_from_this())
 {
     QPID_LOG(debug, "TCPConnector created for " << version.toString());
     settings.configureSocket(socket);
@@ -380,7 +381,7 @@
 // will never be called
 void TCPConnector::run(){
     // Keep the connection impl in memory until run() completes.
-    boost::shared_ptr<ConnectionImpl> protect = impl->shared_from_this();
+    boost::shared_ptr<ConnectionImpl> protect = impl.lock();
     assert(protect);
     try {
         Dispatcher d(poller);

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/Dispatcher.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/Dispatcher.cpp?rev=710106&r1=710105&r2=710106&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/Dispatcher.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/Dispatcher.cpp Mon Nov  3 09:21:38 2008
@@ -91,9 +91,9 @@
         if ( failoverHandler ) {
             QPID_LOG(debug, QPID_MSG(session.getId() << " failover: " << e.what()));
             failoverHandler();
-        }
-        else {
+        } else {
             QPID_LOG(error, session.getId() << " error: " << e.what());
+            throw;
         }
     }
 }

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverListener.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverListener.cpp?rev=710106&r1=710105&r2=710106&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverListener.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverListener.cpp Mon Nov  3 09:21:38
2008
@@ -61,7 +61,17 @@
     session.queueDeclare(arg::queue=qname, arg::exclusive=true, arg::autoDelete=true);
     session.exchangeBind(arg::queue=qname, arg::exchange=AMQ_FAILOVER);
     subscriptions->subscribe(*this, qname, SubscriptionSettings(FlowControl::unlimited(),
ACCEPT_MODE_NONE));
-    thread = sys::Thread(*subscriptions);
+    thread = sys::Thread(*this);
+}
+
+void FailoverListener::run() 
+{
+    try {
+        subscriptions->run();
+    } catch (const TransportFailure&) {
+    } catch (const std::exception& e) {
+        QPID_LOG(error, QPID_MSG(e.what()));
+    }
 }
 
 FailoverListener::~FailoverListener() {

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverListener.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverListener.h?rev=710106&r1=710105&r2=710106&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverListener.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/FailoverListener.h Mon Nov  3 09:21:38 2008
@@ -25,6 +25,7 @@
 #include "qpid/client/MessageListener.h"
 #include "qpid/Url.h"
 #include "qpid/sys/Mutex.h"
+#include "qpid/sys/Runnable.h"
 #include "qpid/sys/Thread.h"
 #include <vector>
 
@@ -36,7 +37,8 @@
 /**
  * @internal Listen for failover updates from the amq.failover exchange.
  */
-class FailoverListener : public MessageListener {
+class FailoverListener : public MessageListener, private qpid::sys::Runnable 
+{
   public:
     FailoverListener(const boost::shared_ptr<ConnectionImpl>&, const std::vector<Url>&
initUrls);
     ~FailoverListener();
@@ -44,6 +46,7 @@
 
     std::vector<Url> getKnownBrokers() const;
     void received(Message& msg);
+    void run();
     
   private:
     mutable sys::Mutex lock;

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.cpp?rev=710106&r1=710105&r2=710106&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.cpp Mon Nov  3 09:21:38 2008
@@ -260,8 +260,9 @@
  * Called by ConnectionImpl to notify active sessions when connection
  * is disconnected
  */
-void SessionImpl::connectionBroke(uint16_t _code, const std::string& _text) {
-    connectionClosed(_code, _text);
+void SessionImpl::connectionBroke(const std::string& _text) {
+    setException(sys::ExceptionHolder(new TransportFailure(_text)));
+    handleClosed();
 }
 
 Future SessionImpl::send(const AMQBody& command)

Modified: incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.h
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.h?rev=710106&r1=710105&r2=710106&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.h (original)
+++ incubator/qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.h Mon Nov  3 09:21:38 2008
@@ -99,7 +99,7 @@
     
     //NOTE: these are called by the network thread when the connection is closed or dies
     void connectionClosed(uint16_t code, const std::string& text);
-    void connectionBroke(uint16_t code, const std::string& text);
+    void connectionBroke(const std::string& text);
 
     /** Set timeout in seconds, returns actual timeout allowed by broker */ 
     uint32_t setTimeout(uint32_t requestedSeconds);

Modified: incubator/qpid/trunk/qpid/cpp/src/tests/exception_test.cpp
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/cpp/src/tests/exception_test.cpp?rev=710106&r1=710105&r2=710106&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/cpp/src/tests/exception_test.cpp (original)
+++ incubator/qpid/trunk/qpid/cpp/src/tests/exception_test.cpp Mon Nov  3 09:21:38 2008
@@ -92,7 +92,7 @@
     ProxyConnection c(fix.broker->getPort(Broker::TCP_TRANSPORT));
     fix.session.queueDeclare(arg::queue="q");
     fix.subs.subscribe(fix.lq, "q");
-    Catcher<ConnectionException> pop(bind(&LocalQueue::pop, &fix.lq, sys::TIME_SEC));
+    Catcher<TransportFailure> pop(bind(&LocalQueue::pop, &fix.lq, sys::TIME_SEC));
     fix.connection.proxy.close();
     BOOST_CHECK(pop.join());
 }
@@ -106,11 +106,10 @@
     fix.session.queueDeclare(arg::queue="q");
     fix.subs.subscribe(l, "q");
 
-    ScopedSuppressLogging sl; // Suppress messages for expected errors.
-    Thread t(fix.subs);
+    Catcher<TransportFailure> runner(bind(&SubscriptionManager::run, boost::ref(fix.subs)));
     fix.connection.proxy.close();
-    t.join();
-    BOOST_CHECK_THROW(fix.session.close(), ConnectionException);    
+    runner.join();
+    BOOST_CHECK_THROW(fix.session.close(), TransportFailure);    
 }
 
 QPID_AUTO_TEST_CASE(NoSuchQueueTest) {



Mime
View raw message