qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From acon...@apache.org
Subject svn commit: r964213 - in /qpid/trunk/qpid/cpp/src/qpid: client/ConnectionImpl.cpp client/ConnectionImpl.h sys/posix/AsynchIO.cpp
Date Wed, 14 Jul 2010 21:33:09 GMT
Author: aconway
Date: Wed Jul 14 21:33:09 2010
New Revision: 964213

URL: http://svn.apache.org/viewvc?rev=964213&view=rev
Log:
Fix read-credit bug causing cluster brokers to disconnect clients sporadically.

Also added connection identifier in connection log messages.

Modified:
    qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp
    qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.h
    qpid/trunk/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp

Modified: qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp?rev=964213&r1=964212&r2=964213&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.cpp Wed Jul 14 21:33:09 2010
@@ -236,7 +236,7 @@ void ConnectionImpl::incoming(framing::A
         s = sessions[frame.getChannel()].lock();
     }
     if (!s) {
-        QPID_LOG(info, "Dropping frame received on invalid channel: " << frame);
+        QPID_LOG(info, *this << " dropping frame received on invalid channel: " <<
frame);
     } else {
         s->in(frame);
     }
@@ -252,7 +252,6 @@ void ConnectionImpl::open()
     const std::string& protocol = handler.protocol;
     const std::string& host = handler.host;
     int port = handler.port;
-    QPID_LOG(info, "Connecting to " << protocol << ":" << host <<
":" << port);
 
     theIO().add();
     connector.reset(Connector::create(protocol, theIO().poller(), version, handler, this));
@@ -267,6 +266,7 @@ void ConnectionImpl::open()
         throw;
     }
     connector->init();
+    QPID_LOG(info, *this << " connected to " << protocol << ":" <<
host << ":" << port);
  
     // Enable heartbeat if requested
     uint16_t heartbeat = static_cast<ConnectionSettings&>(handler).heartbeat;
@@ -291,10 +291,10 @@ void ConnectionImpl::open()
     //enable security layer if one has been negotiated:
     std::auto_ptr<SecurityLayer> securityLayer = handler.getSecurityLayer();
     if (securityLayer.get()) {
-        QPID_LOG(debug, "Activating security layer");
+        QPID_LOG(debug, *this << " activating security layer");
         connector->activateSecurityLayer(securityLayer);
     } else {
-        QPID_LOG(debug, "No security layer in place");
+        QPID_LOG(debug, *this << " no security layer in place");
     }
 }
 
@@ -401,17 +401,20 @@ void ConnectionImpl::failedConnection() 
     bool isClosing = handler.isClosing();
     bool isOpen = handler.isOpen();
 
+    std::ostringstream msg;
+    msg << *this << " closed";
+
     // FIXME aconway 2008-06-06: exception use, amqp0-10 does not seem to have
     // an appropriate close-code. connection-forced is not right.
-    handler.fail(CONN_CLOSED);//ensure connection is marked as failed before notifying sessions
+    handler.fail(msg.str());//ensure connection is marked as failed before notifying sessions
 
     // At this point if the object isn't open and isn't closing it must have failed to open
     // so we can't do the rest of the cleanup
     if (!isClosing && !isOpen) return;
 
     Mutex::ScopedLock l(lock);
-    closeInternal(boost::bind(&SessionImpl::connectionBroke, _1, CONN_CLOSED));
-    setException(new TransportFailure(CONN_CLOSED));
+    closeInternal(boost::bind(&SessionImpl::connectionBroke, _1, msg.str()));
+    setException(new TransportFailure(msg.str()));
 }
 
 void ConnectionImpl::erase(uint16_t ch) {
@@ -435,4 +438,12 @@ boost::shared_ptr<SessionImpl>  Connecti
     return simpl;
 }
 
+std::ostream& operator<<(std::ostream& o, const ConnectionImpl& c) {
+    if (c.connector)
+        return o << "Connection " << c.connector->getIdentifier();
+    else
+        return o << "Connection <not connected>";
+}
+
+
 }} // namespace qpid::client

Modified: qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.h
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.h?rev=964213&r1=964212&r2=964213&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/ConnectionImpl.h Wed Jul 14 21:33:09 2010
@@ -31,6 +31,7 @@
 #include "qpid/sys/TimeoutHandler.h"
 
 #include <map>
+#include <iosfwd>
 #include <boost/shared_ptr.hpp>
 #include <boost/weak_ptr.hpp>
 #include <boost/scoped_ptr.hpp>
@@ -95,8 +96,9 @@ class ConnectionImpl : public Bounds,
 
     std::vector<Url> getInitialBrokers();
     void registerFailureCallback ( boost::function<void ()> fn ) { failureCallback
= fn; }
-
     framing::ProtocolVersion getVersion() { return version; }
+
+  friend std::ostream& operator<<(std::ostream&, const ConnectionImpl&);
 };
 
 }}

Modified: qpid/trunk/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp?rev=964213&r1=964212&r2=964213&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/sys/posix/AsynchIO.cpp Wed Jul 14 21:33:09 2010
@@ -400,10 +400,14 @@ AsynchIO::BufferBase* AsynchIO::getQueue
 }
 
 /*
- * We keep on reading as long as we have something to read and a buffer to put
- * it in
+ * We keep on reading as long as we have something to read, a buffer
+ * to put it in and reading is not stopped by flow control.
  */
 void AsynchIO::readable(DispatchHandle& h) {
+    if (readingStopped) {
+        // We have been flow controlled.
+        return;
+    }
     int readTotal = 0;
     AbsTime readStartTime = AbsTime::now();
     do {



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org


Mime
View raw message