activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r1449963 - /activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/net/tcp/TcpSocket.cpp
Date Mon, 25 Feb 2013 23:36:01 GMT
Author: tabish
Date: Mon Feb 25 23:36:01 2013
New Revision: 1449963

URL: http://svn.apache.org/r1449963
Log:
fix for: https://issues.apache.org/jira/browse/AMQCPP-465

Modified:
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/net/tcp/TcpSocket.cpp

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/net/tcp/TcpSocket.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/net/tcp/TcpSocket.cpp?rev=1449963&r1=1449962&r2=1449963&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/net/tcp/TcpSocket.cpp
(original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/decaf/internal/net/tcp/TcpSocket.cpp
Mon Feb 25 23:36:01 2013
@@ -25,6 +25,7 @@
 #include <decaf/net/SocketOptions.h>
 #include <decaf/lang/Character.h>
 #include <decaf/lang/exceptions/UnsupportedOperationException.h>
+#include <decaf/util/concurrent/atomic/AtomicBoolean.h>
 #include <stdlib.h>
 #include <string>
 #include <stdio.h>
@@ -59,6 +60,7 @@ using namespace decaf::net;
 using namespace decaf::io;
 using namespace decaf::lang;
 using namespace decaf::lang::exceptions;
+using namespace decaf::util::concurrent::atomic;
 
 ////////////////////////////////////////////////////////////////////////////////
 namespace decaf {
@@ -71,19 +73,22 @@ namespace tcp {
 
         decaf::internal::AprPool apr_pool;
         apr_socket_t* socketHandle;
+        bool handleIsRemote;
         apr_sockaddr_t* localAddress;
         apr_sockaddr_t* remoteAddress;
         TcpSocketInputStream* inputStream;
         TcpSocketOutputStream* outputStream;
         bool inputShutdown;
         bool outputShutdown;
-        volatile bool closed;
+        AtomicBoolean closed;
+        bool connected;
         int trafficClass;
         int soTimeout;
         int soLinger;
 
         TcpSocketImpl() : apr_pool(),
                           socketHandle(NULL),
+                          handleIsRemote(false),
                           localAddress(NULL),
                           remoteAddress(NULL),
                           inputStream(NULL),
@@ -91,6 +96,7 @@ namespace tcp {
                           inputShutdown(false),
                           outputShutdown(false),
                           closed(false),
+                          connected(false),
                           trafficClass(0),
                           soTimeout(-1),
                           soLinger(-1) {
@@ -128,6 +134,10 @@ TcpSocket::~TcpSocket() {
     DECAF_CATCHALL_NOTHROW()
 
     try {
+        if (!this->impl->handleIsRemote && this->impl->socketHandle !=
NULL) {
+            apr_socket_close(this->impl->socketHandle);
+        }
+
         delete this->impl;
     }
     DECAF_CATCHALL_NOTHROW()
@@ -165,16 +175,18 @@ void TcpSocket::accept(SocketImpl* socke
             throw IOException(__FILE__, __LINE__, "SocketImpl instance passed was null.");
         }
 
-        TcpSocket* impl = dynamic_cast<TcpSocket*>(socket);
+        TcpSocket* tcpSocket = dynamic_cast<TcpSocket*>(socket);
         if (impl == NULL) {
             throw IOException(__FILE__, __LINE__, "SocketImpl instance passed was not a TcpSocket.");
         }
 
         apr_status_t result = APR_SUCCESS;
 
+        tcpSocket->impl->handleIsRemote = true;
+
         // Loop to ignore any signal interruptions that occur during the operation.
         do {
-            result = apr_socket_accept(&impl->impl->socketHandle,
+            result = apr_socket_accept(&tcpSocket->impl->socketHandle,
                                        this->impl->socketHandle,
                                        this->impl->apr_pool.getAprPool());
         } while (result == APR_EINTR);
@@ -183,6 +195,9 @@ void TcpSocket::accept(SocketImpl* socke
             throw SocketException(__FILE__, __LINE__,
                 "ServerSocket::accept - %s", SocketError::getErrorString().c_str());
         }
+
+        // the socketHandle will have been allocated in the apr_pool of the ServerSocket.
+        tcpSocket->impl->connected = true;
     }
     DECAF_CATCH_RETHROW(decaf::io::IOException)
     DECAF_CATCH_EXCEPTION_CONVERT(Exception, decaf::io::IOException)
@@ -192,7 +207,7 @@ void TcpSocket::accept(SocketImpl* socke
 ////////////////////////////////////////////////////////////////////////////////
 InputStream* TcpSocket::getInputStream() {
 
-    if (this->impl->socketHandle == NULL || this->impl->closed) {
+    if (this->impl->socketHandle == NULL || isClosed()) {
         throw IOException(__FILE__, __LINE__, "The Socket is not Connected.");
     }
 
@@ -216,7 +231,7 @@ InputStream* TcpSocket::getInputStream()
 ////////////////////////////////////////////////////////////////////////////////
 OutputStream* TcpSocket::getOutputStream() {
 
-    if (this->impl->socketHandle == NULL || this->impl->closed) {
+    if (this->impl->socketHandle == NULL || isClosed()) {
         throw IOException(__FILE__, __LINE__, "The Socket is not Connected.");
     }
 
@@ -322,6 +337,7 @@ void TcpSocket::connect(const std::strin
 
         // Now that we connected, cache the port value for later lookups.
         this->port = port;
+        this->impl->connected = true;
 
     } catch (IOException& ex) {
         ex.setMark(__FILE__, __LINE__);
@@ -454,33 +470,29 @@ void TcpSocket::close() {
 
     try {
 
-        if (this->impl->closed) {
-            return;
-        }
+        if (this->impl->closed.compareAndSet(false, true)) {
+            this->impl->connected = false;
 
-        this->impl->closed = true;
-
-        // Destroy the input stream.
-        if (impl->inputStream != NULL) {
-            impl->inputStream->close();
-        }
-
-        // Destroy the output stream.
-        if (impl->outputStream != NULL) {
-            impl->outputStream->close();
-        }
+            // Destroy the input stream.
+            if (impl->inputStream != NULL) {
+                impl->inputStream->close();
+            }
 
-        // When connected we first shutdown, which breaks our reads and writes
-        // then we close to free APR resources.
-        if (isConnected()) {
-            apr_socket_shutdown(impl->socketHandle, APR_SHUTDOWN_READWRITE);
-            apr_socket_close(this->impl->socketHandle);
-            delete this->fd;
+            // Destroy the output stream.
+            if (impl->outputStream != NULL) {
+                impl->outputStream->close();
+            }
 
-            // Clear out the member data.
-            this->impl->socketHandle = NULL;
-            this->port = 0;
-            this->localPort = 0;
+            // When connected we first shutdown, which breaks our reads and writes
+            // then we close to free APR resources.
+            if (this->impl->socketHandle != NULL) {
+                apr_socket_shutdown(impl->socketHandle, APR_SHUTDOWN_READWRITE);
+
+                // Member data from parent
+                delete this->fd;
+                this->port = 0;
+                this->localPort = 0;
+            }
         }
     }
     DECAF_CATCH_RETHROW(decaf::io::IOException)
@@ -624,7 +636,7 @@ void TcpSocket::checkResult(apr_status_t
 int TcpSocket::read(unsigned char* buffer, int size, int offset, int length) {
 
     try {
-        if (this->isClosed()) {
+        if (isClosed()) {
             throw IOException(__FILE__, __LINE__, "The Stream has been closed");
         }
 
@@ -665,7 +677,7 @@ int TcpSocket::read(unsigned char* buffe
         // Check for EOF, on windows we only get size==0 so check that to, if we
         // were closed though then we throw an IOException so the caller knows we
         // aren't usable anymore.
-        if ((APR_STATUS_IS_EOF( result ) || aprSize == 0) && !impl->closed) {
+        if ((APR_STATUS_IS_EOF( result ) || aprSize == 0) && !isClosed()) {
             this->impl->inputShutdown = true;
             return -1;
         }
@@ -701,7 +713,7 @@ void TcpSocket::write(const unsigned cha
                 "TcpSocket::write - passed buffer is null");
         }
 
-        if (this->impl->closed) {
+        if (isClosed()) {
             throw IOException(__FILE__, __LINE__,
                 "TcpSocket::write - This Stream has been closed.");
         }
@@ -726,13 +738,13 @@ void TcpSocket::write(const unsigned cha
 
         const unsigned char* lbuffer = buffer + offset;
 
-        while (remaining > 0 && !this->impl->closed) {
+        while (remaining > 0 && !isClosed()) {
 
             // On input remaining is the bytes to send, after return remaining
             // is the amount actually sent.
             result = apr_socket_send(this->impl->socketHandle, (const char*) lbuffer,
&remaining);
 
-            if (result != APR_SUCCESS || this->impl->closed) {
+            if (result != APR_SUCCESS || isClosed()) {
                 throw IOException(__FILE__, __LINE__,
                     "TcpSocketOutputStream::write - %s", SocketError::getErrorString().c_str());
             }
@@ -750,10 +762,10 @@ void TcpSocket::write(const unsigned cha
 
 ////////////////////////////////////////////////////////////////////////////////
 bool TcpSocket::isConnected() const {
-    return this->impl->socketHandle != NULL;
+    return this->impl->connected;
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 bool TcpSocket::isClosed() const {
-    return this->impl->closed;
+    return this->impl->closed.get();
 }



Mime
View raw message