qpid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From c...@apache.org
Subject svn commit: r1544277 - in /qpid/branches/0.26/qpid/cpp/src/qpid/messaging/amqp: ./ TcpTransport.cpp TcpTransport.h
Date Thu, 21 Nov 2013 18:24:09 GMT
Author: chug
Date: Thu Nov 21 18:24:09 2013
New Revision: 1544277

URL: http://svn.apache.org/r1544277
Log:
QPID-5363: Add locks to prevent race condition in Amqp 1.0 transport - merge from trunk


Modified:
    qpid/branches/0.26/qpid/cpp/src/qpid/messaging/amqp/   (props changed)
    qpid/branches/0.26/qpid/cpp/src/qpid/messaging/amqp/TcpTransport.cpp
    qpid/branches/0.26/qpid/cpp/src/qpid/messaging/amqp/TcpTransport.h

Propchange: qpid/branches/0.26/qpid/cpp/src/qpid/messaging/amqp/
------------------------------------------------------------------------------
--- svn:mergeinfo (added)
+++ svn:mergeinfo Thu Nov 21 18:24:09 2013
@@ -0,0 +1,8 @@
+/qpid/branches/0.5.x-dev/qpid/cpp/src/qpid/messaging/amqp:892761,894875
+/qpid/branches/0.6-release-windows-installer/cpp/src/qpid/messaging/amqp:926803
+/qpid/branches/0.6-release-windows-installer/qpid/cpp/src/qpid/messaging/amqp:926803,927233
+/qpid/branches/QPID-2519/cpp/src/qpid/messaging/amqp:1072051-1079078
+/qpid/branches/java-network-refactor/qpid/cpp/src/qpid/messaging/amqp:805429-825319
+/qpid/branches/qpid-2935/qpid/cpp/src/qpid/messaging/amqp:1061302-1072333
+/qpid/branches/qpid-3346/qpid/cpp/src/qpid/messaging/amqp:1144319-1179855
+/qpid/trunk/qpid/cpp/src/qpid/messaging/amqp:1542010,1543018-1543019,1543935

Modified: qpid/branches/0.26/qpid/cpp/src/qpid/messaging/amqp/TcpTransport.cpp
URL: http://svn.apache.org/viewvc/qpid/branches/0.26/qpid/cpp/src/qpid/messaging/amqp/TcpTransport.cpp?rev=1544277&r1=1544276&r2=1544277&view=diff
==============================================================================
--- qpid/branches/0.26/qpid/cpp/src/qpid/messaging/amqp/TcpTransport.cpp (original)
+++ qpid/branches/0.26/qpid/cpp/src/qpid/messaging/amqp/TcpTransport.cpp Thu Nov 21 18:24:09
2013
@@ -48,7 +48,7 @@ struct StaticInit
 } init;
 }
 
-TcpTransport::TcpTransport(TransportContext& c, boost::shared_ptr<Poller> p) :
socket(createSocket()), context(c), connector(0), aio(0), poller(p) {}
+TcpTransport::TcpTransport(TransportContext& c, boost::shared_ptr<Poller> p) :
socket(createSocket()), context(c), connector(0), aio(0), poller(p), closed(false) {}
 
 void TcpTransport::connect(const std::string& host, const std::string& port)
 {
@@ -66,6 +66,7 @@ void TcpTransport::connect(const std::st
 void TcpTransport::failed(const std::string& msg)
 {
     QPID_LOG(debug, "Failed to connect: " << msg);
+    closed = true;
     connector = 0;
     socket->close();
     context.closed();
@@ -118,9 +119,12 @@ void TcpTransport::write(AsynchIO&)
 
 void TcpTransport::close()
 {
-    QPID_LOG(debug, id << " TcpTransport closing...");
-    if (aio)
-        aio->queueWriteClose();
+    qpid::sys::Mutex::ScopedLock l(lock);
+    if (!closed) {
+        QPID_LOG(debug, id << " TcpTransport closing...");
+        if (aio)
+            aio->queueWriteClose();
+    }
 }
 
 void TcpTransport::eof(AsynchIO&)
@@ -136,31 +140,44 @@ void TcpTransport::disconnected(AsynchIO
 
 void TcpTransport::socketClosed(AsynchIO&, const Socket&)
 {
-    if (aio)
-        aio->queueForDeletion();
-    context.closed();
-    QPID_LOG(debug, id << " Socket closed");
+    bool notify(false);
+    {
+        qpid::sys::Mutex::ScopedLock l(lock);
+        if (!closed) {
+            closed = true;
+            if (aio)
+                aio->queueForDeletion();
+            QPID_LOG(debug, id << " Socket closed");
+            notify = true;
+        } //else has already been closed
+    }
+    if (notify) context.closed();
 }
 
 void TcpTransport::abort()
 {
-    if (aio) {
-        // Established connection
-        aio->requestCallback(boost::bind(&TcpTransport::eof, this, _1));
-    } else if (connector) {
-        // We're still connecting
-        connector->stop();
-        failed("Connection timedout");
+    qpid::sys::Mutex::ScopedLock l(lock);
+    if (!closed) {
+        if (aio) {
+            // Established connection
+            aio->requestCallback(boost::bind(&TcpTransport::eof, this, _1));
+        } else if (connector) {
+            // We're still connecting
+            connector->stop();
+            failed("Connection timedout");
+        }
     }
 }
 
 void TcpTransport::activateOutput()
 {
-    if (aio) aio->notifyPendingWrite();
+    qpid::sys::Mutex::ScopedLock l(lock);
+    if (!closed && aio) aio->notifyPendingWrite();
 }
 
 const qpid::sys::SecuritySettings* TcpTransport::getSecuritySettings()
 {
     return 0;
 }
+
 }}} // namespace qpid::messaging::amqp

Modified: qpid/branches/0.26/qpid/cpp/src/qpid/messaging/amqp/TcpTransport.h
URL: http://svn.apache.org/viewvc/qpid/branches/0.26/qpid/cpp/src/qpid/messaging/amqp/TcpTransport.h?rev=1544277&r1=1544276&r2=1544277&view=diff
==============================================================================
--- qpid/branches/0.26/qpid/cpp/src/qpid/messaging/amqp/TcpTransport.h (original)
+++ qpid/branches/0.26/qpid/cpp/src/qpid/messaging/amqp/TcpTransport.h Thu Nov 21 18:24:09
2013
@@ -59,6 +59,8 @@ class TcpTransport : public Transport
     qpid::sys::AsynchIO* aio;
     boost::shared_ptr<qpid::sys::Poller> poller;
     std::string id;
+    bool closed;
+    qpid::sys::Mutex lock;
 
     void connected(const qpid::sys::Socket&);
     void failed(const std::string& msg);



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org


Mime
View raw message