activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r1444434 - in /activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq: transport/ transport/correlator/ transport/inactivity/ transport/tcp/ wireformat/openwire/
Date Sat, 09 Feb 2013 21:03:23 GMT
Author: tabish
Date: Sat Feb  9 21:03:23 2013
New Revision: 1444434

URL: http://svn.apache.org/r1444434
Log:
https://issues.apache.org/jira/browse/AMQCPP-457

Better control over Transport start and stop.

Modified:
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/IOTransport.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/TransportFilter.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/TransportFilter.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/correlator/ResponseCorrelator.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/correlator/ResponseCorrelator.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/inactivity/InactivityMonitor.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/inactivity/InactivityMonitor.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/tcp/TcpTransport.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/tcp/TcpTransport.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/wireformat/openwire/OpenWireFormatNegotiator.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/wireformat/openwire/OpenWireFormatNegotiator.h

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/IOTransport.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/IOTransport.cpp?rev=1444434&r1=1444433&r2=1444434&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/IOTransport.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/IOTransport.cpp Sat
Feb  9 21:03:23 2013
@@ -183,6 +183,7 @@ void IOTransport::start() {
 void IOTransport::stop() {
 
     try {
+        this->impl->started.set(false);
     }
     AMQ_CATCH_RETHROW(IOException)
     AMQ_CATCH_EXCEPTION_CONVERT(Exception, IOException)

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/TransportFilter.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/TransportFilter.cpp?rev=1444434&r1=1444433&r2=1444434&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/TransportFilter.cpp
(original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/TransportFilter.cpp
Sat Feb  9 21:03:23 2013
@@ -18,59 +18,107 @@
 #include "TransportFilter.h"
 #include <decaf/io/IOException.h>
 
+#include <decaf/util/concurrent/atomic/AtomicBoolean.h>
+
 #include <activemq/wireformat/WireFormat.h>
 
 using namespace activemq;
 using namespace activemq::transport;
 using namespace decaf::lang;
+using namespace decaf::util;
+using namespace decaf::util::concurrent;
+using namespace decaf::util::concurrent::atomic;
 using namespace decaf::io;
 
 ////////////////////////////////////////////////////////////////////////////////
-TransportFilter::TransportFilter(const Pointer<Transport> next) : next(next), listener(NULL)
{
+namespace activemq {
+namespace transport {
+
+    class TransportFilterImpl {
+
+        TransportFilterImpl(const TransportFilterImpl&);
+        TransportFilterImpl& operator= (const TransportFilterImpl&);
+
+    public:
+
+        AtomicBoolean closed;
+        AtomicBoolean started;
+
+        TransportFilterImpl() : closed(), started() {
+        }
+    };
+
+}}
+
+////////////////////////////////////////////////////////////////////////////////
+TransportFilter::TransportFilter(const Pointer<Transport> next) :
+    impl(new TransportFilterImpl()), next(next), listener(NULL) {
+
     // Observe the nested transport for events.
     next->setTransportListener(this);
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 TransportFilter::~TransportFilter() {
+    try {
+        close();
+    }
+    AMQ_CATCHALL_NOTHROW()
+
+    try {
+        // Force next out here so we can ensure we catch any stray
+        // exceptions.  Since we hold the only reference to next it
+        // should get deleted.
+        this->next.reset(NULL);
+    }
+    AMQ_CATCHALL_NOTHROW()
+
+    try {
+        delete this->impl;
+    }
+    AMQ_CATCHALL_NOTHROW()
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 void TransportFilter::onCommand(const Pointer<Command> command) {
-    fire(command);
+
+    if (!this->impl->started.get() || this->impl->closed.get()) {
+        return;
+    }
+
+    try {
+        if (this->listener != NULL) {
+            this->listener->onCommand(command);
+        }
+    } catch (...) {
+    }
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 void TransportFilter::onException(const decaf::lang::Exception& ex) {
-    fire(ex);
-}
 
-////////////////////////////////////////////////////////////////////////////////
-void TransportFilter::fire(const decaf::lang::Exception& ex) {
+    if (!this->impl->started.get() || this->impl->closed.get()) {
+        return;
+    }
 
-    if (listener != NULL) {
+    if (this->listener != NULL) {
         try {
-            listener->onException(ex);
+            this->listener->onException(ex);
         } catch (...) {
         }
     }
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void TransportFilter::fire(const Pointer<Command> command) {
-    try {
-        if (listener != NULL) {
-            listener->onCommand(command);
-        }
-    } catch (...) {
+void TransportFilter::transportInterrupted() {
+
+    if (!this->impl->started.get() || this->impl->closed.get()) {
+        return;
     }
-}
 
-////////////////////////////////////////////////////////////////////////////////
-void TransportFilter::transportInterrupted() {
     try {
-        if (listener != NULL) {
-            listener->transportInterrupted();
+        if (this->listener != NULL) {
+            this->listener->transportInterrupted();
         }
     } catch (...) {
     }
@@ -78,9 +126,14 @@ void TransportFilter::transportInterrupt
 
 ////////////////////////////////////////////////////////////////////////////////
 void TransportFilter::transportResumed() {
+
+    if (!this->impl->started.get() || this->impl->closed.get()) {
+        return;
+    }
+
     try {
-        if (listener != NULL) {
-            listener->transportResumed();
+        if (this->listener != NULL) {
+            this->listener->transportResumed();
         }
     } catch (...) {
     }
@@ -89,34 +142,132 @@ void TransportFilter::transportResumed()
 ////////////////////////////////////////////////////////////////////////////////
 void TransportFilter::start() {
 
-    if (listener == NULL) {
+    if (this->impl->closed.get()) {
+        return;
+    }
+
+    if (this->listener == NULL) {
         throw decaf::io::IOException(__FILE__, __LINE__, "exceptionListener is invalid");
     }
 
-    // Start the delegate transport object.
-    next->start();
+    if (this->next == NULL) {
+        throw decaf::io::IOException(__FILE__, __LINE__, "Transport chain is invalid");
+    }
+
+    try {
+
+        if (this->impl->started.compareAndSet(false, true)) {
+            beforeNextIsStarted();
+            next->start();
+            afterNextIsStarted();
+        }
+    }
+    AMQ_CATCH_RETHROW(IOException)
+    AMQ_CATCH_EXCEPTION_CONVERT(Exception, IOException)
+    AMQ_CATCHALL_THROW(IOException)
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 void TransportFilter::stop() {
-    next->stop();
+
+    if (this->impl->closed.get()) {
+        return;
+    }
+
+    try {
+
+        if (this->impl->started.compareAndSet(true, false)) {
+
+            if (this->next == NULL) {
+                throw decaf::io::IOException(__FILE__, __LINE__, "Transport chain is invalid");
+            }
+
+            IOException error;
+            bool hasException = false;
+
+            try {
+                beforeNextIsStopped();
+            } catch (IOException& ex) {
+                error = ex;
+                error.setMark(__FILE__, __LINE__);
+                hasException = true;
+            }
+
+            try {
+                next->stop();
+            } catch (IOException& ex) {
+                error = ex;
+                error.setMark(__FILE__, __LINE__);
+                hasException = true;
+            }
+
+            try {
+                afterNextIsStopped();
+            } catch (IOException& ex) {
+                error = ex;
+                error.setMark(__FILE__, __LINE__);
+                hasException = true;
+            }
+
+            if (hasException) {
+                throw error;
+            }
+        }
+    }
+    AMQ_CATCH_RETHROW(IOException)
+    AMQ_CATCH_EXCEPTION_CONVERT(Exception, IOException)
+    AMQ_CATCHALL_THROW(IOException)
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 void TransportFilter::close() {
 
-    if (next != NULL) {
-        next->setTransportListener(NULL);
-        next->close();
-        next.reset(NULL);
+    if (this->impl->closed.get()) {
+        return;
     }
 
-    listener = NULL;
+    try {
+
+        if (this->impl->closed.compareAndSet(false, true)) {
+
+            if (this->next == NULL) {
+                throw decaf::io::IOException(__FILE__, __LINE__, "Transport chain is invalid");
+            }
+
+            IOException error;
+            bool hasException = false;
+
+            next->setTransportListener(NULL);
+
+            try {
+                next->close();
+            } catch (IOException& ex) {
+                error = ex;
+                error.setMark(__FILE__, __LINE__);
+                hasException = true;
+            }
+
+            try {
+                doClose();
+            } catch (IOException& ex) {
+                error = ex;
+                error.setMark(__FILE__, __LINE__);
+                hasException = true;
+            }
+
+            if (hasException) {
+                throw error;
+            }
+        }
+    }
+    AMQ_CATCH_RETHROW(IOException)
+    AMQ_CATCH_EXCEPTION_CONVERT(Exception, IOException)
+    AMQ_CATCHALL_THROW(IOException)
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 Transport* TransportFilter::narrow(const std::type_info& typeId) {
-    if (typeid( *this ) == typeId) {
+    if (typeid(*this) == typeId) {
         return this;
     } else if (this->next != NULL) {
         return this->next->narrow(typeId);
@@ -128,6 +279,8 @@ Transport* TransportFilter::narrow(const
 ////////////////////////////////////////////////////////////////////////////////
 void TransportFilter::reconnect(const decaf::net::URI& uri) {
 
+    checkClosed();
+
     try {
         next->reconnect(uri);
     }
@@ -138,10 +291,24 @@ void TransportFilter::reconnect(const de
 
 ////////////////////////////////////////////////////////////////////////////////
 Pointer<wireformat::WireFormat> TransportFilter::getWireFormat() const {
+    checkClosed();
     return next->getWireFormat();
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 void TransportFilter::setWireFormat(const Pointer<wireformat::WireFormat> wireFormat)
{
+    checkClosed();
     next->setWireFormat(wireFormat);
 }
+
+////////////////////////////////////////////////////////////////////////////////
+bool TransportFilter::isClosed() const {
+    return this->impl->closed.get();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void TransportFilter::checkClosed() const {
+    if (this->impl->closed.get()) {
+        throw IOException(__FILE__, __LINE__, "Transport is closed");
+    }
+}

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/TransportFilter.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/TransportFilter.h?rev=1444434&r1=1444433&r2=1444434&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/TransportFilter.h
(original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/TransportFilter.h
Sat Feb  9 21:03:23 2013
@@ -26,13 +26,15 @@
 #include <decaf/lang/Pointer.h>
 #include <typeinfo>
 
-namespace activemq{
-namespace transport{
+namespace activemq {
+namespace transport {
 
     using decaf::lang::Pointer;
     using activemq::commands::Command;
     using activemq::commands::Response;
 
+    class TransportFilterImpl;
+
     /**
      * A filter on the transport layer.  Transport filters implement the Transport
      * interface and optionally delegate calls to another Transport object.
@@ -40,6 +42,10 @@ namespace transport{
      * @since 1.0
      */
     class AMQCPP_API TransportFilter: public Transport, public TransportListener {
+    private:
+
+        TransportFilterImpl* impl;
+
     protected:
 
         /**
@@ -52,20 +58,6 @@ namespace transport{
          */
         TransportListener* listener;
 
-    protected:
-
-        /**
-         * Notify the listener of the thrown Exception.
-         * @param ex - the exception to send to listeners
-         */
-        void fire(const decaf::lang::Exception& ex);
-
-        /**
-         * Notify the listener of the new incoming Command.
-         * @param command - the command to send to the listener
-         */
-        void fire(const Pointer<Command> command);
-
     private:
 
         TransportFilter(const TransportFilter&);
@@ -81,8 +73,20 @@ namespace transport{
 
         virtual ~TransportFilter();
 
+        void start();
+
+        void stop();
+
+        void close();
+
+    protected:
+
+        /**
+         * Throws an IOException if this filter chain has already been closed.
+         */
+        void checkClosed() const;
+
     public:
-        // TransportListener methods
 
         /**
          * Event handler for the receipt of a command.
@@ -108,22 +112,25 @@ namespace transport{
         virtual void transportResumed();
 
     public:
-        // Transport Methods.
 
         virtual void oneway(const Pointer<Command> command) {
+            checkClosed();
             next->oneway(command);
         }
 
         virtual Pointer<FutureResponse> asyncRequest(const Pointer<Command> command,
                                                      const Pointer<ResponseCallback>
responseCallback) {
+            checkClosed();
             return next->asyncRequest(command, responseCallback);
         }
 
         virtual Pointer<Response> request(const Pointer<Command> command) {
+            checkClosed();
             return next->request(command);
         }
 
         virtual Pointer<Response> request(const Pointer<Command> command, unsigned
int timeout) {
+            checkClosed();
             return next->request(command, timeout);
         }
 
@@ -139,44 +146,79 @@ namespace transport{
 
         virtual void setWireFormat(const Pointer<wireformat::WireFormat> wireFormat);
 
-        virtual void start();
-
-        virtual void stop();
-
-        virtual void close();
-
         virtual Transport* narrow(const std::type_info& typeId);
 
         virtual bool isFaultTolerant() const {
-            return next->isFaultTolerant();
+            return !isClosed() && next->isFaultTolerant();
         }
 
         virtual bool isConnected() const {
-            return next->isConnected();
+            return !isClosed() && next->isConnected();
         }
 
         virtual bool isReconnectSupported() const {
-            return next->isReconnectSupported();
+            return !isClosed() && next->isReconnectSupported();
         }
 
         virtual bool isUpdateURIsSupported() const {
-            return next->isUpdateURIsSupported();
+            return !isClosed() && next->isUpdateURIsSupported();
         }
 
-        virtual bool isClosed() const {
-            return next->isClosed();
-        }
+        virtual bool isClosed() const;
 
         virtual std::string getRemoteAddress() const {
+
+            if (isClosed()) {
+                return "";
+            }
+
             return next->getRemoteAddress();
         }
 
         virtual void reconnect(const decaf::net::URI& uri);
 
         virtual void updateURIs(bool rebalance, const decaf::util::List<decaf::net::URI>&
uris) {
+            checkClosed();
             next->updateURIs(rebalance, uris);
         }
 
+    protected:
+
+        /**
+         * Subclasses can override this method to do their own startup work.  This method
+         * will always be called before the next transport in the chain is called in order
+         * to allow this transport a chance to initialize required resources.
+         */
+        virtual void beforeNextIsStarted() {}
+
+        /**
+         * Subclasses can override this method to do their own post startup work.  This method
+         * will always be called after the doStart() method and the next transport's own
start()
+         * methods have been successfully run.
+         */
+        virtual void afterNextIsStarted() {}
+
+        /**
+         * Subclasses can override this method to do their own pre-stop work.  This method
+         * will always be called before the next transport's own stop() method or this filter's
+         * own doStop() method is called.
+         */
+        virtual void beforeNextIsStopped() {}
+
+        /**
+         * Subclasses can override this method to do their own stop work.  This method is
+         * always called after all the next transports have been stopped to prevent this
+         * transport for destroying resources needed by the lower level transports.
+         */
+        virtual void afterNextIsStopped() {}
+
+        /**
+         * Subclasses can override this method to do their own close work.  This method is
+         * always called after all the next transports have been closed to prevent this
+         * transport for destroying resources needed by the lower level transports.
+         */
+        virtual void doClose() {}
+
     };
 
 }}

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/correlator/ResponseCorrelator.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/correlator/ResponseCorrelator.cpp?rev=1444434&r1=1444433&r2=1444434&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/correlator/ResponseCorrelator.cpp
(original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/correlator/ResponseCorrelator.cpp
Sat Feb  9 21:03:23 2013
@@ -85,21 +85,17 @@ namespace correlator{
         // Sync object for accessing the request map.
         decaf::util::concurrent::Mutex mapMutex;
 
-        // Flag to indicate the closed state.
-        bool closed;
-
         // Indicates that an the filter is now unusable from some error.
         Pointer<Exception> priorError;
 
     public:
 
-        CorrelatorData() : nextCommandId(1), requestMap(), mapMutex(), closed(true), priorError(NULL)
{}
+        CorrelatorData() : nextCommandId(1), requestMap(), mapMutex(), priorError(NULL) {}
 
     };
 
 }}}
 
-
 ////////////////////////////////////////////////////////////////////////////////
 ResponseCorrelator::ResponseCorrelator(Pointer<Transport> next) : TransportFilter(next),
impl(new CorrelatorData) {
 }
@@ -120,13 +116,12 @@ ResponseCorrelator::~ResponseCorrelator(
 void ResponseCorrelator::oneway(const Pointer<Command> command) {
 
     try {
+
+        checkClosed();
+
         command->setCommandId(this->impl->nextCommandId.getAndIncrement());
         command->setResponseRequired(false);
 
-        if (this->impl->closed || next == NULL) {
-            throw IOException(__FILE__, __LINE__, "transport already closed");
-        }
-
         next->oneway(command);
     }
     AMQ_CATCH_RETHROW(UnsupportedOperationException)
@@ -141,6 +136,8 @@ Pointer<FutureResponse> ResponseCorrelat
 
     try {
 
+        checkClosed();
+
         command->setCommandId(this->impl->nextCommandId.getAndIncrement());
         command->setResponseRequired(true);
 
@@ -190,6 +187,8 @@ Pointer<Response> ResponseCorrelator::re
 
     try {
 
+        checkClosed();
+
         command->setCommandId(this->impl->nextCommandId.getAndIncrement());
         command->setResponseRequired(true);
 
@@ -240,6 +239,8 @@ Pointer<Response> ResponseCorrelator::re
 
     try {
 
+        checkClosed();
+
         command->setCommandId(this->impl->nextCommandId.getAndIncrement());
         command->setResponseRequired(true);
 
@@ -291,7 +292,7 @@ void ResponseCorrelator::onCommand(const
     // Let's see if the incoming command is a response, if not we just pass it along
     // and allow outstanding requests to keep waiting without stalling control commands.
     if (!command->isResponse()) {
-        fire(command);
+        TransportFilter::onCommand(command);
         return;
     }
 
@@ -316,45 +317,9 @@ void ResponseCorrelator::onCommand(const
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ResponseCorrelator::start() {
-
-    try{
-
-        if (!this->impl->closed) {
-            return;
-        }
-
-        if (listener == NULL) {
-            throw IOException(__FILE__, __LINE__, "exceptionListener is invalid");
-        }
-
-        if (next == NULL) {
-            throw IOException(__FILE__, __LINE__, "next transport is NULL");
-        }
-
-        // Start the delegate transport object.
-        next->start();
-
-        // Mark it as open.
-        this->impl->closed = false;
-    }
-    AMQ_CATCH_RETHROW(IOException)
-    AMQ_CATCH_EXCEPTION_CONVERT(Exception, IOException)
-    AMQ_CATCHALL_THROW(IOException)
-}
-
-////////////////////////////////////////////////////////////////////////////////
-void ResponseCorrelator::close() {
-
+void ResponseCorrelator::doClose() {
     try {
-
         dispose(Pointer<Exception>(new IOException(__FILE__, __LINE__, "Transport Stopped")));
-
-        if (!this->impl->closed && next != NULL) {
-            next->close();
-        }
-
-        this->impl->closed = true;
     }
     AMQ_CATCH_RETHROW(IOException)
     AMQ_CATCH_EXCEPTION_CONVERT(Exception, IOException)
@@ -371,7 +336,7 @@ void ResponseCorrelator::onException(con
 void ResponseCorrelator::dispose(Pointer<Exception> error) {
 
     ArrayList<Pointer<FutureResponse> > requests;
-    synchronized(&this->impl->mapMutex){
+    synchronized(&this->impl->mapMutex) {
         if (this->impl->priorError == NULL) {
             this->impl->priorError = error;
             requests.ensureCapacity((int)this->impl->requestMap.size());

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/correlator/ResponseCorrelator.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/correlator/ResponseCorrelator.h?rev=1444434&r1=1444433&r2=1444434&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/correlator/ResponseCorrelator.h
(original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/correlator/ResponseCorrelator.h
Sat Feb  9 21:03:23 2013
@@ -78,10 +78,6 @@ namespace correlator {
 
         virtual Pointer<Response> request(const Pointer<Command> command, unsigned
int timeout);
 
-        virtual void start();
-
-        virtual void close();
-
         /**
          * This is called in the context of the nested transport's reading thread.  In
          * the case of a response object, updates the request map and notifies those
@@ -103,6 +99,10 @@ namespace correlator {
          */
         virtual void onException(const decaf::lang::Exception& ex);
 
+    protected:
+
+        virtual void doClose();
+
     private:
 
         void dispose(Pointer<decaf::lang::Exception> ex);

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/inactivity/InactivityMonitor.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/inactivity/InactivityMonitor.cpp?rev=1444434&r1=1444433&r2=1444434&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/inactivity/InactivityMonitor.cpp
(original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/inactivity/InactivityMonitor.cpp
Sat Feb  9 21:03:23 2013
@@ -271,9 +271,8 @@ void InactivityMonitor::setKeepAliveResp
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void InactivityMonitor::start() {
+void InactivityMonitor::afterNextIsStarted() {
     try {
-        TransportFilter::start();
         startMonitorThreads();
     }
     AMQ_CATCH_RETHROW(IOException)
@@ -282,10 +281,9 @@ void InactivityMonitor::start() {
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void InactivityMonitor::stop() {
+void InactivityMonitor::beforeNextIsStopped() {
     try {
         stopMonitorThreads();
-        TransportFilter::stop();
     }
     AMQ_CATCH_RETHROW(IOException)
     AMQ_CATCH_EXCEPTION_CONVERT(Exception, IOException)
@@ -293,11 +291,9 @@ void InactivityMonitor::stop() {
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void InactivityMonitor::close() {
+void InactivityMonitor::doClose() {
     try {
-        setTransportListener(NULL);
         stopMonitorThreads();
-        TransportFilter::close();
     }
     AMQ_CATCH_RETHROW(IOException)
     AMQ_CATCH_EXCEPTION_CONVERT(Exception, IOException)
@@ -340,7 +336,7 @@ void InactivityMonitor::onCommand(const 
     } catch (Exception& ex) {
         this->members->inRead.set(false);
         ex.setMark(__FILE__, __LINE__);
-        throw ex;
+        throw;
     }
 }
 
@@ -375,9 +371,8 @@ void InactivityMonitor::oneway(const Poi
             } catch (Exception& ex) {
                 this->members->commandSent.set(true);
                 this->members->inWrite.set(false);
-
                 ex.setMark(__FILE__, __LINE__);
-                throw ex;
+                throw;
             }
         }
     }

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/inactivity/InactivityMonitor.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/inactivity/InactivityMonitor.h?rev=1444434&r1=1444433&r2=1444434&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/inactivity/InactivityMonitor.h
(original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/inactivity/InactivityMonitor.h
Sat Feb  9 21:03:23 2013
@@ -68,12 +68,6 @@ namespace inactivity {
 
     public: // TransportFilter Methods
 
-        virtual void start();
-
-        virtual void stop();
-
-        virtual void close();
-
         virtual void onException(const decaf::lang::Exception& ex);
 
         virtual void onCommand(const Pointer<Command> command);
@@ -98,6 +92,14 @@ namespace inactivity {
 
         void setInitialDelayTime(long long value) const;
 
+    protected:
+
+        virtual void afterNextIsStarted();
+
+        virtual void beforeNextIsStopped();
+
+        virtual void doClose();
+
     private:
 
         // Throttles read checking

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/tcp/TcpTransport.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/tcp/TcpTransport.cpp?rev=1444434&r1=1444433&r2=1444434&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/tcp/TcpTransport.cpp
(original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/tcp/TcpTransport.cpp
Sat Feb  9 21:03:23 2013
@@ -114,16 +114,10 @@ TcpTransport::~TcpTransport() {
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void TcpTransport::start() {
+void TcpTransport::beforeNextIsStarted() {
     try {
-
-        if (this->impl->closed.get()) {
-            throw IOException(__FILE__, __LINE__, "Transport is closed");
-        }
-
         if (this->impl->started.compareAndSet(false, true)) {
             connect();
-            TransportFilter::start();
         }
     }
     AMQ_CATCH_RETHROW(IOException)
@@ -132,21 +126,14 @@ void TcpTransport::start() {
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void TcpTransport::stop() {
+void TcpTransport::afterNextIsStopped() {
     try {
-
-        if (this->impl->closed.get()) {
-            throw IOException(__FILE__, __LINE__, "Transport is closed");
-        }
-
+        // The IOTransport is now stopped, so we can safely closed the socket
+        // and no asynchronous exceptions should be triggered.
         if (this->impl->started.compareAndSet(true, false)) {
-
-            // Close the socket.
             if (impl->socket.get() != NULL) {
                 impl->socket->close();
             }
-
-            TransportFilter::stop();
         }
     }
     AMQ_CATCH_RETHROW(IOException)
@@ -155,19 +142,12 @@ void TcpTransport::stop() {
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void TcpTransport::close() {
-
+void TcpTransport::doClose() {
     try {
-
         if (this->impl->closed.compareAndSet(false, true)) {
-            this->setTransportListener(NULL);
-
-            // Close the socket.
             if (impl->socket.get() != NULL) {
                 impl->socket->close();
             }
-
-            TransportFilter::close();
         }
     }
     AMQ_CATCH_RETHROW(IOException)

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/tcp/TcpTransport.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/tcp/TcpTransport.h?rev=1444434&r1=1444433&r2=1444434&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/tcp/TcpTransport.h
(original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/tcp/TcpTransport.h
Sat Feb  9 21:03:23 2013
@@ -99,12 +99,6 @@ namespace tcp {
 
     public: // Transport Methods
 
-        virtual void start();
-
-        virtual void stop();
-
-        virtual void close();
-
         virtual bool isFaultTolerant() const {
             return false;
         }
@@ -115,6 +109,12 @@ namespace tcp {
 
     protected:
 
+        virtual void beforeNextIsStarted();
+
+        virtual void afterNextIsStopped();
+
+        virtual void doClose();
+
         /**
          * Creates a Socket and configures it before attempting to connect to the location
specified
          * by the URI passed in.

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/wireformat/openwire/OpenWireFormatNegotiator.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/wireformat/openwire/OpenWireFormatNegotiator.cpp?rev=1444434&r1=1444433&r2=1444434&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/wireformat/openwire/OpenWireFormatNegotiator.cpp
(original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/wireformat/openwire/OpenWireFormatNegotiator.cpp
Sat Feb  9 21:03:23 2013
@@ -43,8 +43,7 @@ OpenWireFormatNegotiator::OpenWireFormat
     firstTime(true),
     wireInfoSentDownLatch(1),
     readyCountDownLatch(1),
-    openWireFormat(wireFormat),
-    closed(true) {
+    openWireFormat(wireFormat) {
 }
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -60,9 +59,7 @@ void OpenWireFormatNegotiator::oneway(co
 
     try {
 
-        if (closed || next == NULL) {
-            throw IOException(__FILE__, __LINE__, "OpenWireFormatNegotiator::oneway - transport
already closed");
-        }
+        checkClosed();
 
         if (!readyCountDownLatch.await(negotiationTimeout)) {
             throw IOException(__FILE__, __LINE__, "OpenWireFormatNegotiator::oneway"
@@ -83,9 +80,7 @@ Pointer<Response> OpenWireFormatNegotiat
 
     try {
 
-        if (closed || next == NULL) {
-            throw IOException(__FILE__, __LINE__, "OpenWireFormatNegotiator::request - transport
already closed");
-        }
+        checkClosed();
 
         if (!readyCountDownLatch.await(negotiationTimeout)) {
             throw IOException(__FILE__, __LINE__, "OpenWireFormatNegotiator::request"
@@ -106,9 +101,7 @@ Pointer<Response> OpenWireFormatNegotiat
 
     try {
 
-        if (closed || next == NULL) {
-            throw IOException(__FILE__, __LINE__, "OpenWireFormatNegotiator::request - transport
already closed");
-        }
+        checkClosed();
 
         if (!readyCountDownLatch.await(negotiationTimeout)) {
             throw IOException(__FILE__, __LINE__, "OpenWireFormatNegotiator::request"
@@ -143,44 +136,29 @@ void OpenWireFormatNegotiator::onCommand
             readyCountDownLatch.countDown();
         } catch (exceptions::ActiveMQException& ex) {
             readyCountDownLatch.countDown();
-            fire(ex);
+            TransportFilter::onCommand(command);
         }
     }
 
     // Send along to the next interested party.
-    fire(command);
+    TransportFilter::onCommand(command);
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 void OpenWireFormatNegotiator::onException(const decaf::lang::Exception& ex) {
     readyCountDownLatch.countDown();
-    fire(ex);
+    TransportFilter::onException(ex);
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void OpenWireFormatNegotiator::start() {
-
-    if (!closed) {
-        return;
-    }
-
-    if (listener == NULL) {
-        throw IOException(__FILE__, __LINE__, "OpenWireFormatNegotiator::start - TransportListener
is invalid");
-    }
-
-    if (next == NULL) {
-        throw IOException(__FILE__, __LINE__, "OpenWireFormatNegotiator::start - next transport
is NULL");
-    }
-
-    if (openWireFormat == NULL) {
-        throw IOException(__FILE__, __LINE__, "OpenWireFormatNegotiator::start - openWireFormat
is NULL");
-    }
+void OpenWireFormatNegotiator::afterNextIsStopped() {
+    readyCountDownLatch.countDown();
+}
 
-    // Start the delegate transport object.
-    next->start();
+////////////////////////////////////////////////////////////////////////////////
+void OpenWireFormatNegotiator::afterNextIsStarted() {
 
     if (firstTime.compareAndSet(true, false)) {
-
         try {
 
             // We first send the WireFormat that we'd prefer.
@@ -196,23 +174,4 @@ void OpenWireFormatNegotiator::start() {
             throw;
         }
     }
-
-    // Mark it as open.
-    closed = false;
-}
-
-////////////////////////////////////////////////////////////////////////////////
-void OpenWireFormatNegotiator::close() {
-
-    try {
-
-        if (!closed && next != NULL) {
-            next->close();
-        }
-
-        closed = true;
-    }
-    AMQ_CATCH_RETHROW(IOException)
-    AMQ_CATCH_EXCEPTION_CONVERT( Exception, IOException)
-    AMQ_CATCHALL_THROW(IOException)
 }

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/wireformat/openwire/OpenWireFormatNegotiator.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/wireformat/openwire/OpenWireFormatNegotiator.h?rev=1444434&r1=1444433&r2=1444434&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/wireformat/openwire/OpenWireFormatNegotiator.h
(original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/wireformat/openwire/OpenWireFormatNegotiator.h
Sat Feb  9 21:03:23 2013
@@ -58,11 +58,6 @@ namespace openwire{
          */
         OpenWireFormat* openWireFormat;
 
-        /**
-         * Indicates Transport has shut down
-         */
-        bool closed;
-
     private:
 
         OpenWireFormatNegotiator(const OpenWireFormatNegotiator&);
@@ -79,75 +74,23 @@ namespace openwire{
 
         virtual ~OpenWireFormatNegotiator();
 
-        /**
-         * Sends a one-way command.  Does not wait for any response from the
-         * broker.
-         * First waits for the WireFormatInfo exchange to happen so that we
-         * know how to encode out-bound data.
-         * @param command the command to be sent.
-         * @throws IOException if an exception occurs during writing of
-         * the command.
-         * @throws UnsupportedOperationException if this method is not implemented
-         * by this transport.
-         */
         virtual void oneway(const Pointer<commands::Command> command);
 
-        /**
-         * Sends the given request to the server and waits for the response.
-         * First waits for the WireFormatInfo exchange to happen so that we
-         * know how to encode out-bound data.
-         * @param command The request to send.
-         * @return the response from the server.
-         * @throws IOException if an error occurs with the request.
-         */
         virtual Pointer<commands::Response> request(const Pointer<commands::Command>
command);
 
-        /**
-         * Sends the given request to the server and waits for the response.
-         * First waits for the WireFormatInfo exchange to happen so that we
-         * know how to encode out-bound data.
-         * @param command The request to send.
-         * @param timeout The time to wait for the response.
-         * @return the response from the server.
-         * @throws IOException if an error occurs with the request.
-         */
         virtual Pointer<commands::Response> request(const Pointer<commands::Command>
command, unsigned int timeout);
 
-        /**
-         * This is called in the context of the nested transport's
-         * reading thread.  In the case of a response object,
-         * updates the request map and notifies those waiting on the
-         * response.  Non-response messages are just delegated to
-         * the command listener.
-         * @param command the received from the nested transport.
-         */
+    public:
+
         virtual void onCommand(const Pointer<commands::Command> command);
 
-        /**
-         * Event handler for an exception from a command transport.
-         * @param source The source of the exception
-         * @param ex The exception.
-         */
         virtual void onException(const decaf::lang::Exception& ex);
 
-        /**
-         * Starts this transport object and creates the thread for
-         * polling on the input stream for commands.  If this object
-         * has been closed, throws an exception.  Before calling start,
-         * the caller must set the IO streams and the reader and writer
-         * objects.
-         * @throws IOException if an error occurs or if this transport
-         * has already been closed.
-         */
-        virtual void start();
+    protected:
 
-        /**
-         * Stops the polling thread and closes the streams.  This can
-         * be called explicitly, but is also called in the destructor. Once
-         * this object has been closed, it cannot be restarted.
-         * @throws IOException if errors occur.
-         */
-        virtual void close();
+        virtual void afterNextIsStarted();
+
+        virtual void afterNextIsStopped();
 
     };
 



Mime
View raw message