activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nmitt...@apache.org
Subject svn commit: r498786 - in /incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport: ResponseCorrelator.cpp ResponseCorrelator.h
Date Mon, 22 Jan 2007 20:35:41 GMT
Author: nmittler
Date: Mon Jan 22 12:35:40 2007
New Revision: 498786

URL: http://svn.apache.org/viewvc?view=rev&rev=498786
Log:
[AMQCPP-40] - fix for warning in msvc for no public symbols in ResponseCorrelator

Modified:
    incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/ResponseCorrelator.cpp
    incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/ResponseCorrelator.h

Modified: incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/ResponseCorrelator.cpp
URL: http://svn.apache.org/viewvc/incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/ResponseCorrelator.cpp?view=diff&rev=498786&r1=498785&r2=498786
==============================================================================
--- incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/ResponseCorrelator.cpp
(original)
+++ incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/ResponseCorrelator.cpp
Mon Jan 22 12:35:40 2007
@@ -16,3 +16,246 @@
  */
 
 #include "ResponseCorrelator.h"
+
+using namespace activemq;
+using namespace activemq::transport;
+
+////////////////////////////////////////////////////////////////////////////////
+unsigned int ResponseCorrelator::getNextCommandId() throw ( exceptions::ActiveMQException
){
+    
+    try{
+        synchronized( &commandIdMutex ){
+            return ++nextCommandId;
+        }
+        
+        // Should never get here, but some compilers aren't
+        // smart enough to figure out we'll never get here.
+        return 0;
+    }
+    AMQ_CATCH_RETHROW( exceptions::ActiveMQException )
+    AMQ_CATCHALL_THROW( exceptions::ActiveMQException )
+}
+       
+//////////////////////////////////////////////////////////////////////////////// 
+ResponseCorrelator::ResponseCorrelator( Transport* next, const bool own )
+:
+    TransportFilter( next, own )
+{            
+    nextCommandId = 0;
+            
+            // Default max response wait time to 3 seconds.
+    maxResponseWaitTime = 3000;
+    
+    // Start in the closed state. 
+    closed = true;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+ResponseCorrelator::~ResponseCorrelator(){
+    
+    // Close the transport and destroy it.
+    close();
+
+    // Don't do anything with the future responses -
+    // they should be cleaned up by each requester.                        
+}
+
+////////////////////////////////////////////////////////////////////////////////
+unsigned long ResponseCorrelator::getMaxResponseWaitTime() const{
+    return maxResponseWaitTime;
+}           
+
+////////////////////////////////////////////////////////////////////////////////
+void ResponseCorrelator::setMaxResponseWaitTime( const unsigned long milliseconds ){
+    maxResponseWaitTime = milliseconds;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ResponseCorrelator::oneway( Command* command ) 
+    throw( CommandIOException, exceptions::UnsupportedOperationException )
+{
+    
+    try{
+        command->setCommandId( getNextCommandId() );
+        command->setResponseRequired( false );
+        
+        if( closed || next == NULL ){
+            throw CommandIOException( __FILE__, __LINE__,
+                "transport already closed" );
+        }
+        
+        next->oneway( command );
+    }
+    AMQ_CATCH_RETHROW( exceptions::UnsupportedOperationException )
+    AMQ_CATCH_RETHROW( CommandIOException )
+    AMQ_CATCH_EXCEPTION_CONVERT( exceptions::ActiveMQException, CommandIOException )
+    AMQ_CATCHALL_THROW( CommandIOException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+Response* ResponseCorrelator::request( Command* command ) 
+    throw( CommandIOException, exceptions::UnsupportedOperationException )
+{
+    
+    try{
+        command->setCommandId( getNextCommandId() );
+        command->setResponseRequired( true );
+        
+        // Add a future response object to the map indexed by this
+        // command id.
+        FutureResponse* futureResponse = 
+           new FutureResponse();
+          
+        synchronized( &mapMutex ){
+            requestMap[command->getCommandId()] = futureResponse;
+        }                                
+        
+        // Wait to be notified of the response via the futureResponse
+        // object.
+        Response* response = NULL;
+        synchronized( futureResponse ){
+            
+            // Send the request.
+            next->oneway( command );
+            
+            // Wait for the response to come in.
+            futureResponse->wait( maxResponseWaitTime );                             
         
+            
+            // Get the response.
+            response = futureResponse->getResponse();                    
+        }                              
+                        
+        // Perform cleanup on the map.
+        synchronized( &mapMutex ){
+            
+            // We've done our waiting - get this thing out
+            // of the map.
+            requestMap.erase( command->getCommandId() );
+            
+            // Destroy the futureResponse.  It is safe to
+            // do this now because the other thread only
+            // accesses the futureResponse within a lock on
+            // the map.
+            delete futureResponse;
+            futureResponse = NULL;
+        }
+            
+        if( response == NULL ){                                        
+            
+            throw CommandIOException( __FILE__, __LINE__,
+                "response from futureResponse was invalid" );
+        }
+        
+        return response;                
+    }
+    AMQ_CATCH_RETHROW( exceptions::UnsupportedOperationException )
+    AMQ_CATCH_RETHROW( CommandIOException )
+    AMQ_CATCH_EXCEPTION_CONVERT( exceptions::ActiveMQException, CommandIOException )
+    AMQ_CATCHALL_THROW( CommandIOException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ResponseCorrelator::onCommand( Command* command ){
+    
+    // Let's see if the incoming command is a response.
+    Response* response = 
+       dynamic_cast<Response*>( command );
+       
+    if( response == NULL ){
+        
+        // It's a non-response - just notify the listener.
+        fire( command );
+        return;
+    }
+        
+    // It is a response - let's correlate ...
+    synchronized( &mapMutex ){
+        
+        // Look the future request up based on the correlation id.
+        std::map<unsigned int, FutureResponse*>::iterator iter =
+            requestMap.find( response->getCorrelationId() );
+        if( iter == requestMap.end() ){
+            
+            // This is not terrible - just log it.
+            printf("ResponseCorrelator::onCommand() - received unknown response for request:
%d\n", 
+                response->getCorrelationId() );
+            return;
+        }
+        
+        // Get the future response (if it's in the map, it's not NULL).
+        FutureResponse* futureResponse = NULL;
+        futureResponse = iter->second;
+        
+        // If it's an exception response, notify the exception listener.
+        ExceptionResponse* exResp = 
+            dynamic_cast<ExceptionResponse*>( response );
+        if( exResp != NULL ){
+            const BrokerError* error = exResp->getException();
+            fire( *error );
+        }
+        
+        synchronized( futureResponse ){
+            
+            // Set the response property in the future response.
+            futureResponse->setResponse( response );
+            
+            // Notify all waiting for this response.
+            futureResponse->notifyAll();
+        }
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ResponseCorrelator::setCommandListener( CommandListener* listener ){
+    this->commandlistener = listener;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ResponseCorrelator::setTransportExceptionListener( 
+    TransportExceptionListener* listener )
+{
+    this->exceptionListener = listener;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ResponseCorrelator::start() throw( cms::CMSException ){
+    
+    /**
+     * We're already started.
+     */
+    if( !closed ){
+        return;
+    }
+    
+    if( commandlistener == NULL ){
+        throw exceptions::ActiveMQException( __FILE__, __LINE__,
+            "commandListener is invalid" );
+    }
+    
+    if( exceptionListener == NULL ){
+        throw exceptions::ActiveMQException( __FILE__, __LINE__,
+            "exceptionListener is invalid" );
+    }
+    
+    if( next == NULL ){
+        throw exceptions::ActiveMQException( __FILE__, __LINE__,
+            "next transport is NULL" );
+    }
+    
+    // Start the delegate transport object.
+    next->start();
+    
+    // Mark it as open.
+    closed = false;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ResponseCorrelator::close() throw( cms::CMSException ){
+    
+    if( !closed && next != NULL ){
+        next->close();
+    }
+    
+    closed = true;
+}
+

Modified: incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/ResponseCorrelator.h
URL: http://svn.apache.org/viewvc/incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/ResponseCorrelator.h?view=diff&rev=498786&r1=498785&r2=498786
==============================================================================
--- incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/ResponseCorrelator.h
(original)
+++ incubator/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/transport/ResponseCorrelator.h
Mon Jan 22 12:35:40 2007
@@ -75,20 +75,7 @@
         /**
          * Returns the next available command id.
          */
-        unsigned int getNextCommandId() throw ( exceptions::ActiveMQException ){
-            
-            try{
-                synchronized( &commandIdMutex ){
-                    return ++nextCommandId;
-                }
-                
-                // Should never get here, but some compilers aren't
-                // smart enough to figure out we'll never get here.
-                return 0;
-            }
-            AMQ_CATCH_RETHROW( exceptions::ActiveMQException )
-            AMQ_CATCHALL_THROW( exceptions::ActiveMQException )
-        }
+        unsigned int getNextCommandId() throw ( exceptions::ActiveMQException );
          
     public:
   
@@ -97,43 +84,21 @@
          * @param next the next transport in the chain
          * @param own indicates if this transport owns the next
          */
-        ResponseCorrelator( Transport* next, const bool own = true )
-        :
-            TransportFilter( next, own )
-        {            
-            nextCommandId = 0;
-            
-            // Default max response wait time to 3 seconds.
-            maxResponseWaitTime = 3000;
-            
-            // Start in the closed state. 
-            closed = true;
-        }
+        ResponseCorrelator( Transport* next, const bool own = true );
         
-        virtual ~ResponseCorrelator(){
-            
-            // Close the transport and destroy it.
-            close();
-        
-            // Don't do anything with the future responses -
-            // they should be cleaned up by each requester.                        
-        }
+        virtual ~ResponseCorrelator();
         
         /**
          * Gets the maximum wait time for a response in milliseconds.
          * @return max time that a response can take
          */
-        virtual unsigned long getMaxResponseWaitTime() const{
-            return maxResponseWaitTime;
-        }           
+        virtual unsigned long getMaxResponseWaitTime() const;
         
         /**
          * Sets the maximum wait time for a response in milliseconds.
          * @param milliseconds the max time that a response can take.
          */
-        virtual void setMaxResponseWaitTime( const unsigned long milliseconds ){
-            maxResponseWaitTime = milliseconds;
-        }
+        virtual void setMaxResponseWaitTime( const unsigned long milliseconds );
         
         /**
          * Sends a one-way command.  Does not wait for any response from the
@@ -145,25 +110,7 @@
          * by this transport.
          */
         virtual void oneway( Command* command ) 
-            throw( CommandIOException, exceptions::UnsupportedOperationException )
-        {
-            
-            try{
-                command->setCommandId( getNextCommandId() );
-                command->setResponseRequired( false );
-                
-                if( closed || next == NULL ){
-                    throw CommandIOException( __FILE__, __LINE__,
-                        "transport already closed" );
-                }
-                
-                next->oneway( command );
-            }
-            AMQ_CATCH_RETHROW( exceptions::UnsupportedOperationException )
-            AMQ_CATCH_RETHROW( CommandIOException )
-            AMQ_CATCH_EXCEPTION_CONVERT( exceptions::ActiveMQException, CommandIOException
)
-            AMQ_CATCHALL_THROW( CommandIOException )
-        }
+            throw( CommandIOException, exceptions::UnsupportedOperationException );
         
         /**
          * Sends the given request to the server and waits for the response.
@@ -173,65 +120,7 @@
          * @throws CommandIOException if an error occurs with the request.
          */
         virtual Response* request( Command* command ) 
-            throw( CommandIOException, exceptions::UnsupportedOperationException )
-        {
-            
-            try{
-                command->setCommandId( getNextCommandId() );
-                command->setResponseRequired( true );
-                
-                // Add a future response object to the map indexed by this
-                // command id.
-                FutureResponse* futureResponse = 
-                   new FutureResponse();
-                  
-                synchronized( &mapMutex ){
-                    requestMap[command->getCommandId()] = futureResponse;
-                }                                
-                
-                // Wait to be notified of the response via the futureResponse
-                // object.
-                Response* response = NULL;
-                synchronized( futureResponse ){
-                    
-                    // Send the request.
-                    next->oneway( command );
-                    
-                    // Wait for the response to come in.
-                    futureResponse->wait( maxResponseWaitTime );                     
                 
-                    
-                    // Get the response.
-                    response = futureResponse->getResponse();                    
-                }                              
-                                
-                // Perform cleanup on the map.
-                synchronized( &mapMutex ){
-                    
-                    // We've done our waiting - get this thing out
-                    // of the map.
-                    requestMap.erase( command->getCommandId() );
-                    
-                    // Destroy the futureResponse.  It is safe to
-                    // do this now because the other thread only
-                    // accesses the futureResponse within a lock on
-                    // the map.
-                    delete futureResponse;
-                    futureResponse = NULL;
-                }
-                    
-                if( response == NULL ){                                        
-                    
-                    throw CommandIOException( __FILE__, __LINE__,
-                        "response from futureResponse was invalid" );
-                }
-                
-                return response;                
-            }
-            AMQ_CATCH_RETHROW( exceptions::UnsupportedOperationException )
-            AMQ_CATCH_RETHROW( CommandIOException )
-            AMQ_CATCH_EXCEPTION_CONVERT( exceptions::ActiveMQException, CommandIOException
)
-            AMQ_CATCHALL_THROW( CommandIOException )
-        }
+            throw( CommandIOException, exceptions::UnsupportedOperationException );
         
         /**
          * This is called in the context of the nested transport's
@@ -241,73 +130,20 @@
          * the command listener.
          * @param command the received from the nested transport.
          */
-        virtual void onCommand( Command* command ){
-            
-            // Let's see if the incoming command is a response.
-            Response* response = 
-               dynamic_cast<Response*>( command );
-               
-            if( response == NULL ){
-                
-                // It's a non-response - just notify the listener.
-                fire( command );
-                return;
-            }
-                
-            // It is a response - let's correlate ...
-            synchronized( &mapMutex ){
-                
-                // Look the future request up based on the correlation id.
-                std::map<unsigned int, FutureResponse*>::iterator iter =
-                    requestMap.find( response->getCorrelationId() );
-                if( iter == requestMap.end() ){
-                    
-                    // This is not terrible - just log it.
-                    printf("ResponseCorrelator::onCommand() - received unknown response for
request: %d\n", 
-                        response->getCorrelationId() );
-                    return;
-                }
-                
-                // Get the future response (if it's in the map, it's not NULL).
-                FutureResponse* futureResponse = NULL;
-                futureResponse = iter->second;
-                
-                // If it's an exception response, notify the exception listener.
-                ExceptionResponse* exResp = 
-                    dynamic_cast<ExceptionResponse*>( response );
-                if( exResp != NULL ){
-                    const BrokerError* error = exResp->getException();
-                    fire( *error );
-                }
-                
-                synchronized( futureResponse ){
-                    
-                    // Set the response property in the future response.
-                    futureResponse->setResponse( response );
-                    
-                    // Notify all waiting for this response.
-                    futureResponse->notifyAll();
-                }
-            }
-        }
+        virtual void onCommand( Command* command );
         
         /**
          * Assigns the command listener for non-response commands.
          * @param listener the listener.
          */
-        virtual void setCommandListener( CommandListener* listener ){
-            this->commandlistener = listener;
-        }
+        virtual void setCommandListener( CommandListener* listener );
         
         /**
          * Sets the observer of asynchronous exceptions from this transport.
          * @param listener the listener of transport exceptions.
          */
         virtual void setTransportExceptionListener( 
-            TransportExceptionListener* listener )
-        {
-            this->exceptionListener = listener;
-        }
+            TransportExceptionListener* listener );
         
         /**
          * Starts this transport object and creates the thread for
@@ -318,36 +154,7 @@
          * @throws CMSException if an error occurs or if this transport
          * has already been closed.
          */
-        virtual void start() throw( cms::CMSException ){
-            
-            /**
-             * We're already started.
-             */
-            if( !closed ){
-                return;
-            }
-            
-            if( commandlistener == NULL ){
-                throw exceptions::ActiveMQException( __FILE__, __LINE__,
-                    "commandListener is invalid" );
-            }
-            
-            if( exceptionListener == NULL ){
-                throw exceptions::ActiveMQException( __FILE__, __LINE__,
-                    "exceptionListener is invalid" );
-            }
-            
-            if( next == NULL ){
-                throw exceptions::ActiveMQException( __FILE__, __LINE__,
-                    "next transport is NULL" );
-            }
-            
-            // Start the delegate transport object.
-            next->start();
-            
-            // Mark it as open.
-            closed = false;
-        }
+        virtual void start() throw( cms::CMSException );
         
         /**
          * Stops the polling thread and closes the streams.  This can
@@ -355,14 +162,7 @@
          * this object has been closed, it cannot be restarted.
          * @throws CMSException if errors occur.
          */
-        virtual void close() throw( cms::CMSException ){
-            
-            if( !closed && next != NULL ){
-                next->close();
-            }
-            
-            closed = true;
-        }
+        virtual void close() throw( cms::CMSException );
         
     };
     



Mime
View raw message