activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r1443781 - in /activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport: ./ inactivity/ tcp/
Date Thu, 07 Feb 2013 23:36:01 GMT
Author: tabish
Date: Thu Feb  7 23:36:00 2013
New Revision: 1443781

URL: http://svn.apache.org/r1443781
Log:
https://issues.apache.org/jira/browse/AMQCPP-457

Better control over Transport start and stop.

Modified:
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/IOTransport.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/IOTransport.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/inactivity/InactivityMonitor.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/inactivity/InactivityMonitor.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/tcp/SslTransport.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/tcp/SslTransport.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/tcp/SslTransportFactory.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/tcp/TcpTransport.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/tcp/TcpTransport.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/tcp/TcpTransportFactory.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/tcp/TcpTransportFactory.h

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/IOTransport.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/IOTransport.cpp?rev=1443781&r1=1443780&r2=1443781&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/IOTransport.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/IOTransport.cpp Thu Feb  7 23:36:00 2013
@@ -18,6 +18,7 @@
 #include "IOTransport.h"
 
 #include <decaf/util/concurrent/Concurrent.h>
+#include <decaf/util/concurrent/atomic/AtomicBoolean.h>
 #include <decaf/lang/exceptions/UnsupportedOperationException.h>
 #include <activemq/wireformat/WireFormat.h>
 #include <activemq/exceptions/ActiveMQException.h>
@@ -34,18 +35,46 @@ using namespace decaf::io;
 using namespace decaf::lang;
 using namespace decaf::lang::exceptions;
 using namespace decaf::util::concurrent;
+using namespace decaf::util::concurrent::atomic;
 
 ////////////////////////////////////////////////////////////////////////////////
 LOGDECAF_INITIALIZE( logger, IOTransport, "activemq.transport.IOTransport")
 
+namespace activemq {
+namespace transport {
+
+    class IOTransportImpl {
+    private:
+
+        IOTransportImpl(const IOTransportImpl&);
+        IOTransportImpl& operator= (const IOTransportImpl&);
+
+    public:
+
+        Pointer<wireformat::WireFormat> wireFormat;
+        TransportListener* listener;
+        decaf::io::DataInputStream* inputStream;
+        decaf::io::DataOutputStream* outputStream;
+        Pointer<decaf::lang::Thread> thread;
+        AtomicBoolean closed;
+        AtomicBoolean started;
+
+        IOTransportImpl() : wireFormat(), listener(NULL), inputStream(NULL), outputStream(NULL), thread(), closed(false) {
+        }
+
+        IOTransportImpl(const Pointer<WireFormat> wireFormat) :
+            wireFormat(wireFormat), listener(NULL), inputStream(NULL), outputStream(NULL), thread(), closed(false) {
+        }
+    };
+
+}}
+
 ////////////////////////////////////////////////////////////////////////////////
-IOTransport::IOTransport() :
-    wireFormat(), listener(NULL), inputStream(NULL), outputStream(NULL), thread(), closed(false) {
+IOTransport::IOTransport() : impl(new IOTransportImpl()) {
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-IOTransport::IOTransport(const Pointer<WireFormat> wireFormat) :
-    wireFormat(wireFormat), listener(NULL), inputStream(NULL), outputStream(NULL), thread(), closed(false) {
+IOTransport::IOTransport(const Pointer<WireFormat> wireFormat) : impl(new IOTransportImpl(wireFormat)) {
 }
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -54,14 +83,19 @@ IOTransport::~IOTransport() {
         close();
     }
     AMQ_CATCHALL_NOTHROW()
+
+    try {
+        delete this->impl;
+    }
+    AMQ_CATCHALL_NOTHROW()
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 void IOTransport::fire(decaf::lang::Exception& ex) {
 
-    if (this->listener != NULL && !this->closed) {
+    if (this->impl->listener != NULL && this->impl->started.get() && !this->impl->closed.get()) {
         try {
-            this->listener->onException(ex);
+            this->impl->listener->onException(ex);
         } catch (...) {
         }
     }
@@ -74,11 +108,11 @@ void IOTransport::fire(const Pointer<Com
 
         // If we have been closed then we don't deliver any messages that
         // might have sneaked in while we where closing.
-        if (this->listener == NULL || this->closed == true) {
+        if (this->impl->listener == NULL || this->impl->closed.get()) {
             return;
         }
 
-        this->listener->onCommand(command);
+        this->impl->listener->onCommand(command);
     }
     AMQ_CATCHALL_NOTHROW()
 }
@@ -88,12 +122,12 @@ void IOTransport::oneway(const Pointer<C
 
     try {
 
-        if (closed) {
+        if (impl->closed.get()) {
             throw IOException(__FILE__, __LINE__, "IOTransport::oneway() - transport is closed!");
         }
 
         // Make sure the thread has been started.
-        if (thread == NULL) {
+        if (impl->thread == NULL) {
             throw IOException(__FILE__, __LINE__, "IOTransport::oneway() - transport is not started");
         }
 
@@ -103,14 +137,14 @@ void IOTransport::oneway(const Pointer<C
         }
 
         // Make sure we have an output stream to write to.
-        if (outputStream == NULL) {
+        if (impl->outputStream == NULL) {
             throw IOException(__FILE__, __LINE__, "IOTransport::oneway() - invalid output stream");
         }
 
-        synchronized(outputStream) {
+        synchronized(impl->outputStream) {
             // Write the command to the output stream.
-            this->wireFormat->marshal(command, this, this->outputStream);
-            this->outputStream->flush();
+            this->impl->wireFormat->marshal(command, this, this->impl->outputStream);
+            this->impl->outputStream->flush();
         }
     }
     AMQ_CATCH_RETHROW(IOException)
@@ -123,25 +157,22 @@ void IOTransport::start() {
 
     try {
 
-        // Can't restart a closed transport.
-        if (closed) {
-            throw IOException(__FILE__, __LINE__, "IOTransport::start() - transport is already closed - cannot restart");
-        }
+        if (impl->started.compareAndSet(false, true)) {
 
-        // If it's already started, do nothing.
-        if (thread != NULL) {
-            return;
-        }
+            if (impl->closed.get()) {
+                throw IOException(__FILE__, __LINE__, "IOTransport::start() - transport is already closed - cannot restart");
+            }
 
-        // Make sure all variables that we need have been set.
-        if (inputStream == NULL || outputStream == NULL || wireFormat.get() == NULL) {
-            throw IOException(__FILE__, __LINE__, "IOTransport::start() - "
-                    "IO streams and wireFormat instances must be set before calling start");
-        }
+            // Make sure all variables that we need have been set.
+            if (impl->inputStream == NULL || impl->outputStream == NULL || impl->wireFormat.get() == NULL) {
+                throw IOException(__FILE__, __LINE__, "IOTransport::start() - "
+                        "IO streams and wireFormat instances must be set before calling start");
+            }
 
-        // Start the polling thread.
-        thread.reset(new Thread(this, "IOTransport reader Thread"));
-        thread->start();
+            // Start the polling thread.
+            impl->thread.reset(new Thread(this, "IOTransport reader Thread"));
+            impl->thread->start();
+        }
     }
     AMQ_CATCH_RETHROW(IOException)
     AMQ_CATCH_EXCEPTION_CONVERT(Exception, IOException)
@@ -181,54 +212,51 @@ void IOTransport::close() {
 
     try {
 
-        if (closed) {
-            return;
-        }
-
         // Mark this transport as closed.
-        closed = true;
-
-        Finalizer finalize(thread);
+        if (impl->closed.compareAndSet(false, true)) {
 
-        // No need to fire anymore async events now.
-        this->listener = NULL;
+            Finalizer finalize(impl->thread);
 
-        IOException error;
-        bool hasException = false;
+            // No need to fire anymore async events now.
+            this->impl->listener = NULL;
 
-        // We have to close the input stream before we stop the thread.  this will
-        // force us to wake up the thread if it's stuck in a read (which is likely).
-        // Otherwise, the join that follows will block forever.
-        try {
-            if (inputStream != NULL) {
-                inputStream->close();
-                inputStream = NULL;
-            }
-        } catch (IOException& ex) {
-            error = ex;
-            error.setMark(__FILE__, __LINE__);
-            hasException = true;
-        }
+            IOException error;
+            bool hasException = false;
 
-        try {
-            // Close the output stream.
-            if (outputStream != NULL) {
-                outputStream->close();
-                outputStream = NULL;
-            }
-        } catch (IOException& ex) {
-            if (!hasException) {
+            // We have to close the input stream before we stop the thread.  this will
+            // force us to wake up the thread if it's stuck in a read (which is likely).
+            // Otherwise, the join that follows will block forever.
+            try {
+                if (impl->inputStream != NULL) {
+                    impl->inputStream->close();
+                    impl->inputStream = NULL;
+                }
+            } catch (IOException& ex) {
                 error = ex;
                 error.setMark(__FILE__, __LINE__);
                 hasException = true;
             }
-        }
 
-        // Clear the WireFormat so we can't use it anymore
-        this->wireFormat.reset(NULL);
+            try {
+                // Close the output stream.
+                if (impl->outputStream != NULL) {
+                    impl->outputStream->close();
+                    impl->outputStream = NULL;
+                }
+            } catch (IOException& ex) {
+                if (!hasException) {
+                    error = ex;
+                    error.setMark(__FILE__, __LINE__);
+                    hasException = true;
+                }
+            }
+
+            // Clear the WireFormat so we can't use it anymore
+            this->impl->wireFormat.reset(NULL);
 
-        if (hasException) {
-            throw error;
+            if (hasException) {
+                throw error;
+            }
         }
     }
     AMQ_CATCH_RETHROW(IOException)
@@ -241,10 +269,10 @@ void IOTransport::run() {
 
     try {
 
-        while (!closed) {
+        while (!isClosed()) {
 
             // Read the next command from the input stream.
-            Pointer<Command> command(wireFormat->unmarshal(this, this->inputStream));
+            Pointer<Command> command(impl->wireFormat->unmarshal(this, this->impl->inputStream));
 
             // Notify the listener.
             fire(command);
@@ -266,15 +294,58 @@ void IOTransport::run() {
 ////////////////////////////////////////////////////////////////////////////////
 Pointer<FutureResponse> IOTransport::asyncRequest(const Pointer<Command> command AMQCPP_UNUSED,
                                                   const Pointer<ResponseCallback> responseCallback AMQCPP_UNUSED) {
-    throw UnsupportedOperationException(__FILE__, __LINE__, "IOTransport::asyncRequest() - unsupported operation");
+    throw UnsupportedOperationException(__FILE__, __LINE__,
+        "IOTransport::asyncRequest() - unsupported operation");
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 Pointer<Response> IOTransport::request(const Pointer<Command> command AMQCPP_UNUSED) {
-    throw UnsupportedOperationException(__FILE__, __LINE__, "IOTransport::request() - unsupported operation");
+    throw UnsupportedOperationException(__FILE__, __LINE__,
+        "IOTransport::request() - unsupported operation");
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 Pointer<Response> IOTransport::request(const Pointer<Command> command AMQCPP_UNUSED, unsigned int timeout AMQCPP_UNUSED) {
-    throw UnsupportedOperationException(__FILE__, __LINE__, "IOTransport::request() - unsupported operation");
+    throw UnsupportedOperationException(__FILE__, __LINE__,
+        "IOTransport::request() - unsupported operation");
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void IOTransport::setInputStream(decaf::io::DataInputStream* is) {
+    this->impl->inputStream = is;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void IOTransport::setOutputStream(decaf::io::DataOutputStream* os) {
+    this->impl->outputStream = os;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+Pointer<wireformat::WireFormat> IOTransport::getWireFormat() const {
+    return this->impl->wireFormat;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void IOTransport::setWireFormat(const Pointer<wireformat::WireFormat> wireFormat) {
+    this->impl->wireFormat = wireFormat;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void IOTransport::setTransportListener(TransportListener* listener) {
+    this->impl->listener = listener;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+TransportListener* IOTransport::getTransportListener() const {
+    return this->impl->listener;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool IOTransport::isConnected() const {
+    return !this->impl->closed.get();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool IOTransport::isClosed() const {
+    return this->impl->closed.get();
 }

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/IOTransport.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/IOTransport.h?rev=1443781&r1=1443780&r2=1443781&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/IOTransport.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/IOTransport.h Thu Feb  7 23:36:00 2013
@@ -40,6 +40,8 @@ namespace transport {
     using activemq::commands::Command;
     using activemq::commands::Response;
 
+    class IOTransportImpl;
+
     /**
      * Implementation of the Transport interface that performs marshaling of commands
      * to IO streams.  This class does not implement the request method, it only handles
@@ -56,35 +58,7 @@ namespace transport {
 
     private:
 
-        /**
-         * WireFormat instance to use to Encode / Decode.
-         */
-        Pointer<wireformat::WireFormat> wireFormat;
-
-        /**
-         * Listener of this transport.
-         */
-        TransportListener* listener;
-
-        /**
-         * The input stream for incoming commands.
-         */
-        decaf::io::DataInputStream* inputStream;
-
-        /**
-         * The output stream for out-going commands.
-         */
-        decaf::io::DataOutputStream* outputStream;
-
-        /**
-         * The polling thread.
-         */
-        Pointer<decaf::lang::Thread> thread;
-
-        /**
-         * Flag marking this transport as closed.
-         */
-        volatile bool closed;
+        IOTransportImpl* impl;
 
     private:
 
@@ -129,19 +103,14 @@ namespace transport {
          * @param is
          *      The InputStream that will be read from by this object.
          */
-        virtual void setInputStream(decaf::io::DataInputStream* is) {
-            this->inputStream = is;
-        }
-
+        virtual void setInputStream(decaf::io::DataInputStream* is);
         /**
          * Sets the stream to which this Transport implementation will write its data.
          *
          * @param os
          *      The OuputStream that will be written to by this object.
          */
-        virtual void setOutputStream(decaf::io::DataOutputStream* os) {
-            this->outputStream = os;
-        }
+        virtual void setOutputStream(decaf::io::DataOutputStream* os);
 
     public:  // Transport methods
 
@@ -169,21 +138,13 @@ namespace transport {
          */
         virtual Pointer<Response> request(const Pointer<Command> command, unsigned int timeout);
 
-        virtual Pointer<wireformat::WireFormat> getWireFormat() const {
-            return this->wireFormat;
-        }
+        virtual Pointer<wireformat::WireFormat> getWireFormat() const;
 
-        virtual void setWireFormat(const Pointer<wireformat::WireFormat> wireFormat) {
-            this->wireFormat = wireFormat;
-        }
+        virtual void setWireFormat(const Pointer<wireformat::WireFormat> wireFormat);
 
-        virtual void setTransportListener(TransportListener* listener) {
-            this->listener = listener;
-        }
+        virtual void setTransportListener(TransportListener* listener);
 
-        virtual TransportListener* getTransportListener() const {
-            return this->listener;
-        }
+        virtual TransportListener* getTransportListener() const;
 
         virtual void start();
 
@@ -203,13 +164,9 @@ namespace transport {
             return false;
         }
 
-        virtual bool isConnected() const {
-            return !this->closed;
-        }
+        virtual bool isConnected() const;
 
-        virtual bool isClosed() const {
-            return this->closed;
-        }
+        virtual bool isClosed() const;
 
         virtual std::string getRemoteAddress() const {
             return "";

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/inactivity/InactivityMonitor.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/inactivity/InactivityMonitor.cpp?rev=1443781&r1=1443780&r2=1443781&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/inactivity/InactivityMonitor.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/inactivity/InactivityMonitor.cpp Thu Feb  7 23:36:00 2013
@@ -50,9 +50,9 @@ using namespace decaf::lang;
 using namespace decaf::lang::exceptions;
 
 ////////////////////////////////////////////////////////////////////////////////
-namespace activemq{
-namespace transport{
-namespace inactivity{
+namespace activemq {
+namespace transport {
+namespace inactivity {
 
     class InactivityMonitorData {
     private:
@@ -98,28 +98,29 @@ namespace inactivity{
 
         bool keepAliveResponseRequired;
 
-        InactivityMonitorData() : wireFormat(),
-                                  localWireFormatInfo(),
-                                  remoteWireFormatInfo(),
-                                  readCheckerTask(),
-                                  writeCheckerTask(),
-                                  readCheckTimer("InactivityMonitor Read Check Timer"),
-                                  writeCheckTimer("InactivityMonitor Write Check Timer"),
-                                  asyncTasks(),
-                                  asyncReadTask(),
-                                  asyncWriteTask(),
-                                  monitorStarted(),
-                                  commandSent(),
-                                  commandReceived(),
-                                  failed(),
-                                  inRead(),
-                                  inWrite(),
-                                  inWriteMutex(),
-                                  monitor(),
-                                  readCheckTime(0),
-                                  writeCheckTime(0),
-                                  initialDelayTime(0),
-                                  keepAliveResponseRequired(false) {
+        InactivityMonitorData(const Pointer<WireFormat> wireFormat) :
+            wireFormat(wireFormat),
+            localWireFormatInfo(),
+            remoteWireFormatInfo(),
+            readCheckerTask(),
+            writeCheckerTask(),
+            readCheckTimer("InactivityMonitor Read Check Timer"),
+            writeCheckTimer("InactivityMonitor Write Check Timer"),
+            asyncTasks(),
+            asyncReadTask(),
+            asyncWriteTask(),
+            monitorStarted(),
+            commandSent(),
+            commandReceived(true),
+            failed(),
+            inRead(),
+            inWrite(),
+            inWriteMutex(),
+            monitor(),
+            readCheckTime(0),
+            writeCheckTime(0),
+            initialDelayTime(0),
+            keepAliveResponseRequired(false) {
         }
     };
 
@@ -206,35 +207,13 @@ namespace inactivity{
 
 ////////////////////////////////////////////////////////////////////////////////
 InactivityMonitor::InactivityMonitor(const Pointer<Transport> next, const Pointer<WireFormat> wireFormat) :
-    TransportFilter(next), members(new InactivityMonitorData()) {
-
-    this->members->wireFormat = wireFormat;
-    this->members->monitorStarted.set(false);
-    this->members->commandSent.set(false);
-    this->members->commandReceived.set(true);
-    this->members->failed.set(false);
-    this->members->inRead.set(false);
-    this->members->inWrite.set(false);
-    this->members->readCheckTime = 0;
-    this->members->writeCheckTime = 0;
-    this->members->initialDelayTime = 0;
-    this->members->keepAliveResponseRequired = false;
+    TransportFilter(next), members(new InactivityMonitorData(wireFormat)) {
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 InactivityMonitor::InactivityMonitor(const Pointer<Transport> next, const decaf::util::Properties& properties, const Pointer<wireformat::WireFormat> wireFormat) :
-    TransportFilter(next), members(new InactivityMonitorData()) {
+    TransportFilter(next), members(new InactivityMonitorData(wireFormat)) {
 
-    this->members->wireFormat = wireFormat;
-    this->members->monitorStarted.set(false);
-    this->members->commandSent.set(false);
-    this->members->commandReceived.set(true);
-    this->members->failed.set(false);
-    this->members->inRead.set(false);
-    this->members->inWrite.set(false);
-    this->members->readCheckTime = 0;
-    this->members->writeCheckTime = 0;
-    this->members->initialDelayTime = 0;
     this->members->keepAliveResponseRequired = Boolean::parseBoolean(properties.getProperty("keepAliveResponseRequired", "false"));
 }
 
@@ -292,6 +271,28 @@ void InactivityMonitor::setKeepAliveResp
 }
 
 ////////////////////////////////////////////////////////////////////////////////
+void InactivityMonitor::start() {
+    try {
+        TransportFilter::start();
+        startMonitorThreads();
+    }
+    AMQ_CATCH_RETHROW(IOException)
+    AMQ_CATCH_EXCEPTION_CONVERT(Exception, IOException)
+    AMQ_CATCHALL_THROW(IOException)
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void InactivityMonitor::stop() {
+    try {
+        stopMonitorThreads();
+        TransportFilter::stop();
+    }
+    AMQ_CATCH_RETHROW(IOException)
+    AMQ_CATCH_EXCEPTION_CONVERT(Exception, IOException)
+    AMQ_CATCHALL_THROW(IOException)
+}
+
+////////////////////////////////////////////////////////////////////////////////
 void InactivityMonitor::close() {
     try {
         setTransportListener(NULL);
@@ -426,17 +427,19 @@ void InactivityMonitor::writeCheck() {
 ////////////////////////////////////////////////////////////////////////////////
 void InactivityMonitor::startMonitorThreads() {
 
-    synchronized( &this->members->monitor ) {
+    if (this->members->monitorStarted.get()) {
+        return;
+    }
 
-        if (this->members->monitorStarted.get()) {
-            return;
-        }
-        if (this->members->localWireFormatInfo == NULL) {
-            return;
-        }
-        if (this->members->remoteWireFormatInfo == NULL) {
-            return;
-        }
+    if (this->members->localWireFormatInfo == NULL) {
+        return;
+    }
+
+    if (this->members->remoteWireFormatInfo == NULL) {
+        return;
+    }
+
+    synchronized( &this->members->monitor ) {
 
         this->members->asyncTasks.reset(new CompositeTaskRunner());
         this->members->asyncReadTask.reset(new AsyncSignalReadErrorkTask(this, this->getRemoteAddress()));
@@ -467,13 +470,9 @@ void InactivityMonitor::startMonitorThre
 ////////////////////////////////////////////////////////////////////////////////
 void InactivityMonitor::stopMonitorThreads() {
 
-    if (this->members == NULL) {
-        return;
-    }
-
-    synchronized(&this->members->monitor) {
+    if (this->members->monitorStarted.compareAndSet(true, false)) {
 
-        if (this->members->monitorStarted.compareAndSet(true, false)) {
+        synchronized(&this->members->monitor) {
 
             this->members->readCheckerTask->cancel();
             this->members->writeCheckerTask->cancel();

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/inactivity/InactivityMonitor.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/inactivity/InactivityMonitor.h?rev=1443781&r1=1443780&r2=1443781&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/inactivity/InactivityMonitor.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/inactivity/InactivityMonitor.h Thu Feb  7 23:36:00 2013
@@ -68,6 +68,10 @@ namespace inactivity {
 
     public: // TransportFilter Methods
 
+        virtual void start();
+
+        virtual void stop();
+
         virtual void close();
 
         virtual void onException(const decaf::lang::Exception& ex);

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/tcp/SslTransport.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/tcp/SslTransport.cpp?rev=1443781&r1=1443780&r2=1443781&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/tcp/SslTransport.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/tcp/SslTransport.cpp Thu Feb  7 23:36:00 2013
@@ -38,7 +38,8 @@ using namespace decaf::lang;
 using namespace decaf::lang::exceptions;
 
 ////////////////////////////////////////////////////////////////////////////////
-SslTransport::SslTransport(const Pointer<Transport> next) : TcpTransport(next) {
+SslTransport::SslTransport(const Pointer<Transport> next, const decaf::net::URI& location) :
+    TcpTransport(next, location) {
 }
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -59,7 +60,7 @@ Socket* SslTransport::createSocket() {
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void SslTransport::configureSocket(Socket* socket, decaf::util::Properties& properties) {
+void SslTransport::configureSocket(Socket* socket) {
 
     try {
 
@@ -73,7 +74,7 @@ void SslTransport::configureSocket(Socke
                 "Socket passed was not an SSLSocket instance.");
         }
 
-        TcpTransport::configureSocket(socket, properties);
+        TcpTransport::configureSocket(socket);
     }
     DECAF_CATCH_RETHROW(NullPointerException)
     DECAF_CATCH_RETHROW(IllegalArgumentException)

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/tcp/SslTransport.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/tcp/SslTransport.h?rev=1443781&r1=1443780&r2=1443781&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/tcp/SslTransport.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/tcp/SslTransport.h Thu Feb  7 23:36:00 2013
@@ -45,9 +45,12 @@ namespace tcp {
          * Creates a new instance of the SslTransport, the transport will not attempt to
          * connect to a remote host until the connect method is called.
          *
-         * @param next the next transport in the chain
+         * @param next
+         *      The next transport in the chain
+         * @param location
+         *      The URI of the host this transport is to connect to.
          */
-        SslTransport(const Pointer<Transport> next);
+        SslTransport(const Pointer<Transport> next, const decaf::net::URI& location);
 
         virtual ~SslTransport();
 
@@ -61,7 +64,7 @@ namespace tcp {
         /**
          * {@inheritDoc}
          */
-        virtual void configureSocket(decaf::net::Socket* socket, decaf::util::Properties& properties);
+        virtual void configureSocket(decaf::net::Socket* socket);
 
     };
 

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/tcp/SslTransportFactory.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/tcp/SslTransportFactory.cpp?rev=1443781&r1=1443780&r2=1443781&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/tcp/SslTransportFactory.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/tcp/SslTransportFactory.cpp Thu Feb  7 23:36:00 2013
@@ -50,9 +50,13 @@ Pointer<Transport> SslTransportFactory::
 
     try {
 
-        Pointer<Transport> transport(new SslTransport(Pointer<Transport>(new IOTransport(wireFormat))));
+        Pointer<Transport> transport(new IOTransport(wireFormat));
 
-        transport.dynamicCast<SslTransport>()->connect(location, properties);
+        transport.reset(new SslTransport(transport, location));
+
+        // Give this class and any derived classes a chance to apply value that
+        // are set in the properties object.
+        doConfigureTransport(transport, properties);
 
         if (properties.getProperty("transport.useInactivityMonitor", "true") == "true") {
             transport.reset(new InactivityMonitor(transport, properties, wireFormat));

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/tcp/TcpTransport.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/tcp/TcpTransport.cpp?rev=1443781&r1=1443780&r2=1443781&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/tcp/TcpTransport.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/tcp/TcpTransport.cpp Thu Feb  7 23:36:00 2013
@@ -22,9 +22,8 @@
 
 #include <decaf/lang/exceptions/NullPointerException.h>
 #include <decaf/lang/exceptions/IllegalArgumentException.h>
-#include <decaf/lang/Integer.h>
-#include <decaf/lang/Boolean.h>
 #include <decaf/net/SocketFactory.h>
+#include <decaf/util/concurrent/atomic/AtomicBoolean.h>
 
 #include <memory>
 
@@ -37,13 +36,68 @@ using namespace activemq::exceptions;
 using namespace decaf;
 using namespace decaf::net;
 using namespace decaf::util;
+using namespace decaf::util::concurrent;
+using namespace decaf::util::concurrent::atomic;
 using namespace decaf::io;
 using namespace decaf::lang;
 using namespace decaf::lang::exceptions;
 
+namespace activemq {
+namespace transport {
+namespace tcp {
+
+    class TcpTransportImpl {
+    private:
+
+        TcpTransportImpl(const TcpTransportImpl&);
+        TcpTransportImpl& operator= (const TcpTransportImpl&);
+
+    public:
+
+        int connectTimeout;
+
+        AtomicBoolean closed;
+        AtomicBoolean started;
+
+        std::auto_ptr<decaf::net::Socket> socket;
+        std::auto_ptr<decaf::io::DataInputStream> dataInputStream;
+        std::auto_ptr<decaf::io::DataOutputStream> dataOutputStream;
+
+        const decaf::net::URI& location;
+
+        int outputBufferSize;
+        int inputBufferSize;
+
+        bool trace;
+
+        int soLinger;
+        bool soKeepAlive;
+        int soReceiveBufferSize;
+        int soSendBufferSize;
+        bool tcpNoDelay;
+
+        TcpTransportImpl(const decaf::net::URI& location) :
+            connectTimeout(0),
+            closed(false),
+            socket(),
+            dataInputStream(),
+            dataOutputStream(),
+            location(location),
+            outputBufferSize(8192),
+            inputBufferSize(8192),
+            trace(false),
+            soLinger(-1),
+            soKeepAlive(false),
+            soReceiveBufferSize(-1),
+            soSendBufferSize(-1),
+            tcpNoDelay(true) {
+        }
+    };
+}}}
+
 ////////////////////////////////////////////////////////////////////////////////
-TcpTransport::TcpTransport(const Pointer<Transport> next) :
-    TransportFilter(next), connectTimeout(0), closed(false), socket(), dataInputStream(), dataOutputStream() {
+TcpTransport::TcpTransport(const Pointer<Transport> next, const decaf::net::URI& location) :
+    TransportFilter(next), impl(new TcpTransportImpl(location)) {
 }
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -52,23 +106,69 @@ TcpTransport::~TcpTransport() {
         close();
     }
     AMQ_CATCHALL_NOTHROW()
+
+    try {
+        delete this->impl;
+    }
+    AMQ_CATCHALL_NOTHROW()
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void TcpTransport::close() {
+void TcpTransport::start() {
+    try {
+
+        if (this->impl->closed.get()) {
+            throw IOException(__FILE__, __LINE__, "Transport is closed");
+        }
+
+        if (this->impl->started.compareAndSet(false, true)) {
+            connect();
+            TransportFilter::start();
+        }
+    }
+    AMQ_CATCH_RETHROW(IOException)
+    AMQ_CATCH_EXCEPTION_CONVERT(Exception, IOException)
+    AMQ_CATCHALL_THROW(IOException)
+}
 
+////////////////////////////////////////////////////////////////////////////////
+void TcpTransport::stop() {
     try {
 
-        this->closed = true;
-        this->setTransportListener(NULL);
+        if (this->impl->closed.get()) {
+            throw IOException(__FILE__, __LINE__, "Transport is closed");
+        }
+
+        if (this->impl->started.compareAndSet(true, false)) {
 
-        // Close the socket.
-        if (socket.get() != NULL) {
-            socket->close();
+            // Close the socket.
+            if (impl->socket.get() != NULL) {
+                impl->socket->close();
+            }
+
+            TransportFilter::stop();
         }
+    }
+    AMQ_CATCH_RETHROW(IOException)
+    AMQ_CATCH_EXCEPTION_CONVERT(Exception, IOException)
+    AMQ_CATCHALL_THROW(IOException)
+}
 
-        // Invoke the paren't close first.
-        TransportFilter::close();
+////////////////////////////////////////////////////////////////////////////////
+void TcpTransport::close() {
+
+    try {
+
+        if (this->impl->closed.compareAndSet(false, true)) {
+            this->setTransportListener(NULL);
+
+            // Close the socket.
+            if (impl->socket.get() != NULL) {
+                impl->socket->close();
+            }
+
+            TransportFilter::close();
+        }
     }
     AMQ_CATCH_RETHROW(IOException)
     AMQ_CATCH_EXCEPTION_CONVERT(Exception, IOException)
@@ -76,25 +176,28 @@ void TcpTransport::close() {
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void TcpTransport::connect(const decaf::net::URI& uri, const decaf::util::Properties& properties) {
+void TcpTransport::connect() {
 
     try {
 
-        socket.reset(this->createSocket());
+        impl->socket.reset(this->createSocket());
 
         // Set all Socket Options from the URI options.
-        this->configureSocket(socket.get(), properties);
+        this->configureSocket(impl->socket.get());
+
+        URI uri = this->impl->location;
 
         // Ensure something is actually passed in for the URI
         if (uri.getAuthority() == "") {
-            throw SocketException(__FILE__, __LINE__, "Connection URI was not provided or is invalid: %s", uri.toString().c_str());
+            throw SocketException(__FILE__, __LINE__,
+                "Connection URI was not provided or is invalid: %s", uri.toString().c_str());
         }
 
         // Connect the socket.
         string host = uri.getHost();
         int port = uri.getPort();
 
-        socket->connect(host, port, connectTimeout);
+        impl->socket->connect(host, port, impl->connectTimeout);
 
         // Cast it to an IO transport so we can wire up the socket
         // input and output streams.
@@ -105,20 +208,20 @@ void TcpTransport::connect(const decaf::
         }
 
         // Get the read buffer size.
-        int inputBufferSize = Integer::parseInt(properties.getProperty("inputBufferSize", "8192"));
+        int inputBufferSize = this->impl->inputBufferSize;
 
         // Get the write buffer size.
-        int outputBufferSize = Integer::parseInt(properties.getProperty("outputBufferSize", "8192"));
+        int outputBufferSize = this->impl->outputBufferSize;
 
         // We don't own these ever, socket object owns.
-        InputStream* socketIStream = socket->getInputStream();
-        OutputStream* sokcetOStream = socket->getOutputStream();
+        InputStream* socketIStream = impl->socket->getInputStream();
+        OutputStream* sokcetOStream = impl->socket->getOutputStream();
 
         Pointer<InputStream> inputStream;
         Pointer<OutputStream> outputStream;
 
         // If tcp tracing was enabled, wrap the input / output streams with logging streams
-        if (properties.getProperty("transport.tcpTracingEnabled", "false") == "true") {
+        if (this->impl->trace) {
             // Wrap with logging stream, we don't own the wrapped streams
             inputStream.reset(new LoggingInputStream(socketIStream));
             outputStream.reset(new LoggingOutputStream(sokcetOStream));
@@ -135,12 +238,12 @@ void TcpTransport::connect(const decaf::
         // Now wrap the Buffered Streams with DataInput based streams.  We own
         // the Source streams, all the streams in the chain that we own are
         // destroyed when these are.
-        this->dataInputStream.reset(new DataInputStream(inputStream.release(), true));
-        this->dataOutputStream.reset(new DataOutputStream(outputStream.release(), true));
+        this->impl->dataInputStream.reset(new DataInputStream(inputStream.release(), true));
+        this->impl->dataOutputStream.reset(new DataOutputStream(outputStream.release(), true));
 
         // Give the IOTransport the streams.
-        ioTransport->setInputStream(dataInputStream.get());
-        ioTransport->setOutputStream(dataOutputStream.get());
+        ioTransport->setInputStream(impl->dataInputStream.get());
+        ioTransport->setOutputStream(impl->dataOutputStream.get());
     }
     AMQ_CATCH_RETHROW(ActiveMQException)
     AMQ_CATCH_EXCEPTION_CONVERT(Exception, ActiveMQException)
@@ -160,31 +263,17 @@ Socket* TcpTransport::createSocket() {
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void TcpTransport::configureSocket(Socket* socket, const Properties& properties) {
+void TcpTransport::configureSocket(Socket* socket) {
 
     try {
 
-        // Get the linger flag.
-        int soLinger = Integer::parseInt(properties.getProperty("soLinger", "-1"));
-
-        // Get the keepAlive flag.
-        bool soKeepAlive = Boolean::parseBoolean(properties.getProperty("soKeepAlive", "false"));
-
-        // Get the socket receive buffer size.
-        int soReceiveBufferSize = Integer::parseInt(properties.getProperty("soReceiveBufferSize", "-1"));
-
-        // Get the socket send buffer size.
-        int soSendBufferSize = Integer::parseInt(properties.getProperty("soSendBufferSize", "-1"));
-
-        // Get the socket TCP_NODELAY flag.
-        bool tcpNoDelay = Boolean::parseBoolean(properties.getProperty("tcpNoDelay", "true"));
-
-        // Get the socket connect timeout in microseconds. (default to infinite wait).
-        this->connectTimeout = Integer::parseInt(properties.getProperty("soConnectTimeout", "0"));
+        int soLinger = this->impl->soLinger;
+        int soReceiveBufferSize = this->impl->soReceiveBufferSize;
+        int soSendBufferSize = this->impl->soSendBufferSize;
 
         // Set the socket options.
-        socket->setKeepAlive(soKeepAlive);
-        socket->setTcpNoDelay(tcpNoDelay);
+        socket->setKeepAlive(this->impl->soKeepAlive);
+        socket->setTcpNoDelay(this->impl->tcpNoDelay);
 
         if (soLinger > 0) {
             socket->setSoLinger(true, soLinger);
@@ -204,3 +293,107 @@ void TcpTransport::configureSocket(Socke
     DECAF_CATCH_EXCEPTION_CONVERT(Exception, SocketException)
     DECAF_CATCHALL_THROW(SocketException)
 }
+
+////////////////////////////////////////////////////////////////////////////////
+bool TcpTransport::isClosed() const {
+    return this->impl->closed.get();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool TcpTransport::isConnected() const {
+    if (this->impl->socket.get() != NULL) {
+        return this->impl->socket->isConnected();
+    }
+
+    return false;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void TcpTransport::setConnectTimeout(int soConnectTimeout) {
+    this->impl->connectTimeout = soConnectTimeout;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+int TcpTransport::getConnectTimeout() const {
+    return this->impl->connectTimeout;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void TcpTransport::setOutputBufferSize(int outputBufferSize) {
+    this->impl->outputBufferSize = outputBufferSize;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+int TcpTransport::getOutputBufferSize() const {
+    return this->impl->outputBufferSize;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void TcpTransport::setInputBufferSize(int inputBufferSize) {
+    this->impl->inputBufferSize = inputBufferSize;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+int TcpTransport::getInputBufferSize() const {
+    return this->impl->inputBufferSize;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void TcpTransport::setTrace(bool trace) {
+    this->impl->trace = trace;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool TcpTransport::isTrace() const {
+    return this->impl->trace;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void TcpTransport::setLinger(int soLinger) {
+    this->impl->soLinger = soLinger;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+int TcpTransport::getLinger() const {
+    return this->impl->soLinger;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void TcpTransport::setKeepAlive(bool soKeepAlive) {
+    this->impl->soKeepAlive = soKeepAlive;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool TcpTransport::isKeepAlive() const {
+    return this->impl->soKeepAlive;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void TcpTransport::setReceiveBufferSize(int soReceiveBufferSize) {
+    this->impl->soReceiveBufferSize = soReceiveBufferSize;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+int TcpTransport::getReceiveBufferSize() const {
+    return this->impl->soReceiveBufferSize;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void TcpTransport::setSendBufferSize(int soSendBufferSize) {
+    this->impl->soSendBufferSize = soSendBufferSize;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+int TcpTransport::getSendBufferSize() const {
+    return this->impl->soSendBufferSize;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void TcpTransport::setTcpNoDelay(bool tcpNoDelay) {
+    this->impl->tcpNoDelay = tcpNoDelay;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool TcpTransport::isTcpNoDelay() const {
+    return this->impl->tcpNoDelay;
+}

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/tcp/TcpTransport.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/tcp/TcpTransport.h?rev=1443781&r1=1443780&r2=1443781&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/tcp/TcpTransport.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/tcp/TcpTransport.h Thu Feb  7 23:36:00 2013
@@ -32,12 +32,14 @@
 #include <decaf/io/DataOutputStream.h>
 #include <memory>
 
-namespace activemq{
-namespace transport{
-namespace tcp{
+namespace activemq {
+namespace transport {
+namespace tcp {
 
     using decaf::lang::Pointer;
 
+    class TcpTransportImpl;
+
     /**
      * Implements a TCP/IP based transport filter, this transport is meant to
      * wrap an instance of an IOTransport.  The lower level transport should take
@@ -46,30 +48,7 @@ namespace tcp{
     class AMQCPP_API TcpTransport: public TransportFilter {
     private:
 
-        /**
-         * Stores the URI configured Socket connect timeout.
-         */
-        int connectTimeout;
-
-        /**
-         * has close been called.
-         */
-        bool closed;
-
-        /**
-         * Socket that this Transport Communicates with
-         */
-        std::auto_ptr<decaf::net::Socket> socket;
-
-        /**
-         * Input Stream for Reading in Messages
-         */
-        std::auto_ptr<decaf::io::DataInputStream> dataInputStream;
-
-        /**
-         * Output Stream for Writing out Messages.
-         */
-        std::auto_ptr<decaf::io::DataOutputStream> dataOutputStream;
+        TcpTransportImpl* impl;
 
     private:
 
@@ -84,46 +63,65 @@ namespace tcp{
          *
          * @param next
          *      The next transport in the chain
+         * @param location
+         *      The URI of the host this transport is to connect to.
          */
-        TcpTransport(const Pointer<Transport> next);
+        TcpTransport(const Pointer<Transport> next, const decaf::net::URI& location);
 
         virtual ~TcpTransport();
 
-        /**
-         * Creates a Socket and configures it before attempting to connect to the location specified
-         * by the URI passed in.  The Socket is configured using parameters in the properties that
-         * are passed to this method.
-         *
-         * @param uri
-         *      The URI that the Transport is to connect to once initialized.
-         * @param properties
-         *      The Properties that have been parsed from the URI or from configuration files.
-         */
-        void connect(const decaf::net::URI& uri, const decaf::util::Properties& properties);
+        void setConnectTimeout(int soConnectTimeout);
+        int getConnectTimeout() const;
+
+        void setOutputBufferSize(int outputBufferSize);
+        int getOutputBufferSize() const;
+
+        void setInputBufferSize(int inputBufferSize);
+        int getInputBufferSize() const;
+
+        void setTrace(bool trace);
+        bool isTrace() const;
+
+        void setLinger(int soLinger);
+        int getLinger() const;
+
+        void setKeepAlive(bool soKeepAlive);
+        bool isKeepAlive() const;
+
+        void setReceiveBufferSize(int soReceiveBufferSize);
+        int getReceiveBufferSize() const;
+
+        void setSendBufferSize(int soSendBufferSize);
+        int getSendBufferSize() const;
+
+        void setTcpNoDelay(bool tcpNoDelay);
+        bool isTcpNoDelay() const;
 
     public: // Transport Methods
 
+        virtual void start();
+
+        virtual void stop();
+
         virtual void close();
 
         virtual bool isFaultTolerant() const {
             return false;
         }
 
-        virtual bool isConnected() const {
-            if (this->socket.get() != NULL) {
-                return this->socket->isConnected();
-            }
+        virtual bool isConnected() const;
 
-            return false;
-        }
-
-        virtual bool isClosed() const {
-            return this->closed;
-        }
+        virtual bool isClosed() const;
 
     protected:
 
         /**
+         * Creates a Socket and configures it before attempting to connect to the location specified
+         * by the URI passed in.
+         */
+        void connect();
+
+        /**
          * Create an unconnected Socket instance to be used by the transport to communicate
          * with the broker.
          *
@@ -146,7 +144,7 @@ namespace tcp{
          * @throw IllegalArgumentException if the socket instance is not handled by the class.
          * @throw SocketException if there is an error while setting one of the Socket options.
          */
-        virtual void configureSocket(decaf::net::Socket* socket, const decaf::util::Properties& properties);
+        virtual void configureSocket(decaf::net::Socket* socket);
 
     };
 

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/tcp/TcpTransportFactory.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/tcp/TcpTransportFactory.cpp?rev=1443781&r1=1443780&r2=1443781&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/tcp/TcpTransportFactory.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/tcp/TcpTransportFactory.cpp Thu Feb  7 23:36:00 2013
@@ -25,6 +25,8 @@
 #include <activemq/util/URISupport.h>
 #include <activemq/wireformat/WireFormat.h>
 #include <decaf/util/Properties.h>
+#include <decaf/lang/Integer.h>
+#include <decaf/lang/Boolean.h>
 
 using namespace activemq;
 using namespace activemq::util;
@@ -37,6 +39,7 @@ using namespace activemq::transport::ina
 using namespace activemq::exceptions;
 using namespace decaf;
 using namespace decaf::lang;
+using namespace decaf::util;
 
 ////////////////////////////////////////////////////////////////////////////////
 Pointer<Transport> TcpTransportFactory::create(const decaf::net::URI& location) {
@@ -47,10 +50,10 @@ Pointer<Transport> TcpTransportFactory::
 
         Pointer<WireFormat> wireFormat = this->createWireFormat(properties);
 
-        // Create the initial Transport, then wrap it in the normal Filters
+        // Create the initial Composite Transport, then wrap it in the normal Filters
+        // for a non-composite Transport which right now is just a ResponseCorrelator
         Pointer<Transport> transport(doCreateComposite(location, wireFormat, properties));
 
-        // Create the Transport for response correlator
         transport.reset(new ResponseCorrelator(transport));
 
         return transport;
@@ -84,10 +87,13 @@ Pointer<Transport> TcpTransportFactory::
 
     try {
 
-        Pointer<Transport> transport(new TcpTransport(Pointer<Transport>(new IOTransport(wireFormat))));
+        Pointer<Transport> transport(new IOTransport(wireFormat));
+
+        transport.reset(new TcpTransport(transport, location));
 
-        // Initialize the Transport, creates Sockets and configures defaults.
-        transport.dynamicCast<TcpTransport>()->connect(location, properties);
+        // Give this class and any derived classes a chance to apply value that
+        // are set in the properties object.
+        doConfigureTransport(transport, properties);
 
         if (properties.getProperty("transport.useInactivityMonitor", "true") == "true") {
             transport.reset(new InactivityMonitor(transport, properties, wireFormat));
@@ -100,11 +106,9 @@ Pointer<Transport> TcpTransportFactory::
             properties.getProperty("transport.useLogging", "false") == "true" ||
             properties.getProperty("transport.trace", "false") == "true") {
 
-            // Create the Transport for response correlator
             transport.reset(new LoggingTransport(transport));
         }
 
-        // If there is a negotiator need then we create and wrap here.
         if (wireFormat->hasNegotiator()) {
             transport = wireFormat->createNegotiator(transport);
         }
@@ -115,3 +119,26 @@ Pointer<Transport> TcpTransportFactory::
     AMQ_CATCH_EXCEPTION_CONVERT(Exception, ActiveMQException)
     AMQ_CATCHALL_THROW(ActiveMQException)
 }
+
+////////////////////////////////////////////////////////////////////////////////
+void TcpTransportFactory::doConfigureTransport(Pointer<Transport> transport,
+                                               const decaf::util::Properties& properties) {
+
+    try {
+
+        Pointer<TcpTransport> tcp = transport.dynamicCast<TcpTransport>();
+
+        tcp->setInputBufferSize(Integer::parseInt(properties.getProperty("inputBufferSize", "8192")));
+        tcp->setOutputBufferSize(Integer::parseInt(properties.getProperty("outputBufferSize", "8192")));
+        tcp->setTrace(Boolean::parseBoolean(properties.getProperty("transport.tcpTracingEnabled", "false")));
+        tcp->setLinger(Integer::parseInt(properties.getProperty("soLinger", "-1")));
+        tcp->setKeepAlive(Boolean::parseBoolean(properties.getProperty("soKeepAlive", "false")));
+        tcp->setReceiveBufferSize(Integer::parseInt(properties.getProperty("soReceiveBufferSize", "-1")));
+        tcp->setSendBufferSize(Integer::parseInt(properties.getProperty("soSendBufferSize", "-1")));
+        tcp->setTcpNoDelay(Boolean::parseBoolean(properties.getProperty("tcpNoDelay", "true")));
+        tcp->setConnectTimeout(Integer::parseInt(properties.getProperty("soConnectTimeout", "0")));
+    }
+    AMQ_CATCH_RETHROW(ActiveMQException)
+    AMQ_CATCH_EXCEPTION_CONVERT(Exception, ActiveMQException)
+    AMQ_CATCHALL_THROW(ActiveMQException)
+}

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/tcp/TcpTransportFactory.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/tcp/TcpTransportFactory.h?rev=1443781&r1=1443780&r2=1443781&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/tcp/TcpTransportFactory.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/tcp/TcpTransportFactory.h Thu Feb  7 23:36:00 2013
@@ -44,7 +44,10 @@ namespace tcp{
 
         virtual Pointer<Transport> doCreateComposite(const decaf::net::URI& location,
                                                      const Pointer<wireformat::WireFormat> wireFormat,
-                                                     const decaf::util::Properties& properties );
+                                                     const decaf::util::Properties& properties);
+
+        virtual void doConfigureTransport(Pointer<Transport>, const decaf::util::Properties& properties);
+
     };
 
 }}}



Mime
View raw message