Author: nmittler Date: Mon Jan 22 12:35:40 2007 New Revision: 498786 URL: http://svn.apache.org/viewvc?view=rev&rev=498786 Log: [AMQCPP-40] - fix for warning in msvc for no public symbols in ResponseCorrelator Modified: incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/ResponseCorrelator.cpp incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/ResponseCorrelator.h Modified: incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/ResponseCorrelator.cpp URL: http://svn.apache.org/viewvc/incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/ResponseCorrelator.cpp?view=diff&rev=498786&r1=498785&r2=498786 ============================================================================== --- incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/ResponseCorrelator.cpp (original) +++ incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/ResponseCorrelator.cpp Mon Jan 22 12:35:40 2007 @@ -16,3 +16,246 @@ */ #include "ResponseCorrelator.h" + +using namespace activemq; +using namespace activemq::transport; + +//////////////////////////////////////////////////////////////////////////////// +unsigned int ResponseCorrelator::getNextCommandId() throw ( exceptions::ActiveMQException ){ + + try{ + synchronized( &commandIdMutex ){ + return ++nextCommandId; + } + + // Should never get here, but some compilers aren't + // smart enough to figure out we'll never get here. + return 0; + } + AMQ_CATCH_RETHROW( exceptions::ActiveMQException ) + AMQ_CATCHALL_THROW( exceptions::ActiveMQException ) +} + +//////////////////////////////////////////////////////////////////////////////// +ResponseCorrelator::ResponseCorrelator( Transport* next, const bool own ) +: + TransportFilter( next, own ) +{ + nextCommandId = 0; + + // Default max response wait time to 3 seconds. + maxResponseWaitTime = 3000; + + // Start in the closed state. + closed = true; +} + +//////////////////////////////////////////////////////////////////////////////// +ResponseCorrelator::~ResponseCorrelator(){ + + // Close the transport and destroy it. + close(); + + // Don't do anything with the future responses - + // they should be cleaned up by each requester. +} + +//////////////////////////////////////////////////////////////////////////////// +unsigned long ResponseCorrelator::getMaxResponseWaitTime() const{ + return maxResponseWaitTime; +} + +//////////////////////////////////////////////////////////////////////////////// +void ResponseCorrelator::setMaxResponseWaitTime( const unsigned long milliseconds ){ + maxResponseWaitTime = milliseconds; +} + +//////////////////////////////////////////////////////////////////////////////// +void ResponseCorrelator::oneway( Command* command ) + throw( CommandIOException, exceptions::UnsupportedOperationException ) +{ + + try{ + command->setCommandId( getNextCommandId() ); + command->setResponseRequired( false ); + + if( closed || next == NULL ){ + throw CommandIOException( __FILE__, __LINE__, + "transport already closed" ); + } + + next->oneway( command ); + } + AMQ_CATCH_RETHROW( exceptions::UnsupportedOperationException ) + AMQ_CATCH_RETHROW( CommandIOException ) + AMQ_CATCH_EXCEPTION_CONVERT( exceptions::ActiveMQException, CommandIOException ) + AMQ_CATCHALL_THROW( CommandIOException ) +} + +//////////////////////////////////////////////////////////////////////////////// +Response* ResponseCorrelator::request( Command* command ) + throw( CommandIOException, exceptions::UnsupportedOperationException ) +{ + + try{ + command->setCommandId( getNextCommandId() ); + command->setResponseRequired( true ); + + // Add a future response object to the map indexed by this + // command id. + FutureResponse* futureResponse = + new FutureResponse(); + + synchronized( &mapMutex ){ + requestMap[command->getCommandId()] = futureResponse; + } + + // Wait to be notified of the response via the futureResponse + // object. + Response* response = NULL; + synchronized( futureResponse ){ + + // Send the request. + next->oneway( command ); + + // Wait for the response to come in. + futureResponse->wait( maxResponseWaitTime ); + + // Get the response. + response = futureResponse->getResponse(); + } + + // Perform cleanup on the map. + synchronized( &mapMutex ){ + + // We've done our waiting - get this thing out + // of the map. + requestMap.erase( command->getCommandId() ); + + // Destroy the futureResponse. It is safe to + // do this now because the other thread only + // accesses the futureResponse within a lock on + // the map. + delete futureResponse; + futureResponse = NULL; + } + + if( response == NULL ){ + + throw CommandIOException( __FILE__, __LINE__, + "response from futureResponse was invalid" ); + } + + return response; + } + AMQ_CATCH_RETHROW( exceptions::UnsupportedOperationException ) + AMQ_CATCH_RETHROW( CommandIOException ) + AMQ_CATCH_EXCEPTION_CONVERT( exceptions::ActiveMQException, CommandIOException ) + AMQ_CATCHALL_THROW( CommandIOException ) +} + +//////////////////////////////////////////////////////////////////////////////// +void ResponseCorrelator::onCommand( Command* command ){ + + // Let's see if the incoming command is a response. + Response* response = + dynamic_cast( command ); + + if( response == NULL ){ + + // It's a non-response - just notify the listener. + fire( command ); + return; + } + + // It is a response - let's correlate ... + synchronized( &mapMutex ){ + + // Look the future request up based on the correlation id. + std::map::iterator iter = + requestMap.find( response->getCorrelationId() ); + if( iter == requestMap.end() ){ + + // This is not terrible - just log it. + printf("ResponseCorrelator::onCommand() - received unknown response for request: %d\n", + response->getCorrelationId() ); + return; + } + + // Get the future response (if it's in the map, it's not NULL). + FutureResponse* futureResponse = NULL; + futureResponse = iter->second; + + // If it's an exception response, notify the exception listener. + ExceptionResponse* exResp = + dynamic_cast( response ); + if( exResp != NULL ){ + const BrokerError* error = exResp->getException(); + fire( *error ); + } + + synchronized( futureResponse ){ + + // Set the response property in the future response. + futureResponse->setResponse( response ); + + // Notify all waiting for this response. + futureResponse->notifyAll(); + } + } +} + +//////////////////////////////////////////////////////////////////////////////// +void ResponseCorrelator::setCommandListener( CommandListener* listener ){ + this->commandlistener = listener; +} + +//////////////////////////////////////////////////////////////////////////////// +void ResponseCorrelator::setTransportExceptionListener( + TransportExceptionListener* listener ) +{ + this->exceptionListener = listener; +} + +//////////////////////////////////////////////////////////////////////////////// +void ResponseCorrelator::start() throw( cms::CMSException ){ + + /** + * We're already started. + */ + if( !closed ){ + return; + } + + if( commandlistener == NULL ){ + throw exceptions::ActiveMQException( __FILE__, __LINE__, + "commandListener is invalid" ); + } + + if( exceptionListener == NULL ){ + throw exceptions::ActiveMQException( __FILE__, __LINE__, + "exceptionListener is invalid" ); + } + + if( next == NULL ){ + throw exceptions::ActiveMQException( __FILE__, __LINE__, + "next transport is NULL" ); + } + + // Start the delegate transport object. + next->start(); + + // Mark it as open. + closed = false; +} + +//////////////////////////////////////////////////////////////////////////////// +void ResponseCorrelator::close() throw( cms::CMSException ){ + + if( !closed && next != NULL ){ + next->close(); + } + + closed = true; +} + Modified: incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/ResponseCorrelator.h URL: http://svn.apache.org/viewvc/incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/ResponseCorrelator.h?view=diff&rev=498786&r1=498785&r2=498786 ============================================================================== --- incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/ResponseCorrelator.h (original) +++ incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/ResponseCorrelator.h Mon Jan 22 12:35:40 2007 @@ -75,20 +75,7 @@ /** * Returns the next available command id. */ - unsigned int getNextCommandId() throw ( exceptions::ActiveMQException ){ - - try{ - synchronized( &commandIdMutex ){ - return ++nextCommandId; - } - - // Should never get here, but some compilers aren't - // smart enough to figure out we'll never get here. - return 0; - } - AMQ_CATCH_RETHROW( exceptions::ActiveMQException ) - AMQ_CATCHALL_THROW( exceptions::ActiveMQException ) - } + unsigned int getNextCommandId() throw ( exceptions::ActiveMQException ); public: @@ -97,43 +84,21 @@ * @param next the next transport in the chain * @param own indicates if this transport owns the next */ - ResponseCorrelator( Transport* next, const bool own = true ) - : - TransportFilter( next, own ) - { - nextCommandId = 0; - - // Default max response wait time to 3 seconds. - maxResponseWaitTime = 3000; - - // Start in the closed state. - closed = true; - } + ResponseCorrelator( Transport* next, const bool own = true ); - virtual ~ResponseCorrelator(){ - - // Close the transport and destroy it. - close(); - - // Don't do anything with the future responses - - // they should be cleaned up by each requester. - } + virtual ~ResponseCorrelator(); /** * Gets the maximum wait time for a response in milliseconds. * @return max time that a response can take */ - virtual unsigned long getMaxResponseWaitTime() const{ - return maxResponseWaitTime; - } + virtual unsigned long getMaxResponseWaitTime() const; /** * Sets the maximum wait time for a response in milliseconds. * @param milliseconds the max time that a response can take. */ - virtual void setMaxResponseWaitTime( const unsigned long milliseconds ){ - maxResponseWaitTime = milliseconds; - } + virtual void setMaxResponseWaitTime( const unsigned long milliseconds ); /** * Sends a one-way command. Does not wait for any response from the @@ -145,25 +110,7 @@ * by this transport. */ virtual void oneway( Command* command ) - throw( CommandIOException, exceptions::UnsupportedOperationException ) - { - - try{ - command->setCommandId( getNextCommandId() ); - command->setResponseRequired( false ); - - if( closed || next == NULL ){ - throw CommandIOException( __FILE__, __LINE__, - "transport already closed" ); - } - - next->oneway( command ); - } - AMQ_CATCH_RETHROW( exceptions::UnsupportedOperationException ) - AMQ_CATCH_RETHROW( CommandIOException ) - AMQ_CATCH_EXCEPTION_CONVERT( exceptions::ActiveMQException, CommandIOException ) - AMQ_CATCHALL_THROW( CommandIOException ) - } + throw( CommandIOException, exceptions::UnsupportedOperationException ); /** * Sends the given request to the server and waits for the response. @@ -173,65 +120,7 @@ * @throws CommandIOException if an error occurs with the request. */ virtual Response* request( Command* command ) - throw( CommandIOException, exceptions::UnsupportedOperationException ) - { - - try{ - command->setCommandId( getNextCommandId() ); - command->setResponseRequired( true ); - - // Add a future response object to the map indexed by this - // command id. - FutureResponse* futureResponse = - new FutureResponse(); - - synchronized( &mapMutex ){ - requestMap[command->getCommandId()] = futureResponse; - } - - // Wait to be notified of the response via the futureResponse - // object. - Response* response = NULL; - synchronized( futureResponse ){ - - // Send the request. - next->oneway( command ); - - // Wait for the response to come in. - futureResponse->wait( maxResponseWaitTime ); - - // Get the response. - response = futureResponse->getResponse(); - } - - // Perform cleanup on the map. - synchronized( &mapMutex ){ - - // We've done our waiting - get this thing out - // of the map. - requestMap.erase( command->getCommandId() ); - - // Destroy the futureResponse. It is safe to - // do this now because the other thread only - // accesses the futureResponse within a lock on - // the map. - delete futureResponse; - futureResponse = NULL; - } - - if( response == NULL ){ - - throw CommandIOException( __FILE__, __LINE__, - "response from futureResponse was invalid" ); - } - - return response; - } - AMQ_CATCH_RETHROW( exceptions::UnsupportedOperationException ) - AMQ_CATCH_RETHROW( CommandIOException ) - AMQ_CATCH_EXCEPTION_CONVERT( exceptions::ActiveMQException, CommandIOException ) - AMQ_CATCHALL_THROW( CommandIOException ) - } + throw( CommandIOException, exceptions::UnsupportedOperationException ); /** * This is called in the context of the nested transport's @@ -241,73 +130,20 @@ * the command listener. * @param command the received from the nested transport. */ - virtual void onCommand( Command* command ){ - - // Let's see if the incoming command is a response. - Response* response = - dynamic_cast( command ); - - if( response == NULL ){ - - // It's a non-response - just notify the listener. - fire( command ); - return; - } - - // It is a response - let's correlate ... - synchronized( &mapMutex ){ - - // Look the future request up based on the correlation id. - std::map::iterator iter = - requestMap.find( response->getCorrelationId() ); - if( iter == requestMap.end() ){ - - // This is not terrible - just log it. - printf("ResponseCorrelator::onCommand() - received unknown response for request: %d\n", - response->getCorrelationId() ); - return; - } - - // Get the future response (if it's in the map, it's not NULL). - FutureResponse* futureResponse = NULL; - futureResponse = iter->second; - - // If it's an exception response, notify the exception listener. - ExceptionResponse* exResp = - dynamic_cast( response ); - if( exResp != NULL ){ - const BrokerError* error = exResp->getException(); - fire( *error ); - } - - synchronized( futureResponse ){ - - // Set the response property in the future response. - futureResponse->setResponse( response ); - - // Notify all waiting for this response. - futureResponse->notifyAll(); - } - } - } + virtual void onCommand( Command* command ); /** * Assigns the command listener for non-response commands. * @param listener the listener. */ - virtual void setCommandListener( CommandListener* listener ){ - this->commandlistener = listener; - } + virtual void setCommandListener( CommandListener* listener ); /** * Sets the observer of asynchronous exceptions from this transport. * @param listener the listener of transport exceptions. */ virtual void setTransportExceptionListener( - TransportExceptionListener* listener ) - { - this->exceptionListener = listener; - } + TransportExceptionListener* listener ); /** * Starts this transport object and creates the thread for @@ -318,36 +154,7 @@ * @throws CMSException if an error occurs or if this transport * has already been closed. */ - virtual void start() throw( cms::CMSException ){ - - /** - * We're already started. - */ - if( !closed ){ - return; - } - - if( commandlistener == NULL ){ - throw exceptions::ActiveMQException( __FILE__, __LINE__, - "commandListener is invalid" ); - } - - if( exceptionListener == NULL ){ - throw exceptions::ActiveMQException( __FILE__, __LINE__, - "exceptionListener is invalid" ); - } - - if( next == NULL ){ - throw exceptions::ActiveMQException( __FILE__, __LINE__, - "next transport is NULL" ); - } - - // Start the delegate transport object. - next->start(); - - // Mark it as open. - closed = false; - } + virtual void start() throw( cms::CMSException ); /** * Stops the polling thread and closes the streams. This can @@ -355,14 +162,7 @@ * this object has been closed, it cannot be restarted. * @throws CMSException if errors occur. */ - virtual void close() throw( cms::CMSException ){ - - if( !closed && next != NULL ){ - next->close(); - } - - closed = true; - } + virtual void close() throw( cms::CMSException ); };