activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r686632 - in /activemq/activemq-cpp/trunk/src: examples/consumers/ examples/producers/ main/activemq/connector/ main/activemq/connector/openwire/ main/activemq/connector/stomp/ main/activemq/core/ main/activemq/transport/ main/activemq/tran...
Date Sun, 17 Aug 2008 18:57:35 GMT
Author: tabish
Date: Sun Aug 17 11:57:34 2008
New Revision: 686632

URL: http://svn.apache.org/viewvc?rev=686632&view=rev
Log:
https://issues.apache.org/activemq/browse/AMQCPP-189

Adding timed and un-timed requests to the Response correlator code via the Future response object, refactored FutureReponse to use a Latch to wait for the response.

This fixes the first part of Adding Producer Flow control, clients will now block waiting for the broker when using sync style sends.  The async transport will keep running right now consuming all client memory if allowed to run long enough.  

Modified:
    activemq/activemq-cpp/trunk/src/examples/consumers/SimpleAsyncConsumer.cpp
    activemq/activemq-cpp/trunk/src/examples/producers/SimpleProducer.cpp
    activemq/activemq-cpp/trunk/src/main/activemq/connector/ProducerInfo.h
    activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireConnector.cpp
    activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireConnector.h
    activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireFormatNegotiator.cpp
    activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireFormatNegotiator.h
    activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireProducerInfo.h
    activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/StompConnectionNegotiator.cpp
    activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/StompConnectionNegotiator.h
    activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/StompConnector.cpp
    activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/StompConnector.h
    activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/StompProducerInfo.h
    activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConstants.cpp
    activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConstants.h
    activemq/activemq-cpp/trunk/src/main/activemq/transport/IOTransport.cpp
    activemq/activemq-cpp/trunk/src/main/activemq/transport/IOTransport.h
    activemq/activemq-cpp/trunk/src/main/activemq/transport/MockTransport.cpp
    activemq/activemq-cpp/trunk/src/main/activemq/transport/MockTransport.h
    activemq/activemq-cpp/trunk/src/main/activemq/transport/Transport.h
    activemq/activemq-cpp/trunk/src/main/activemq/transport/TransportFilter.h
    activemq/activemq-cpp/trunk/src/main/activemq/transport/filters/LoggingTransport.cpp
    activemq/activemq-cpp/trunk/src/main/activemq/transport/filters/LoggingTransport.h
    activemq/activemq-cpp/trunk/src/main/activemq/transport/filters/ResponseCorrelator.cpp
    activemq/activemq-cpp/trunk/src/main/activemq/transport/filters/ResponseCorrelator.h
    activemq/activemq-cpp/trunk/src/main/activemq/transport/filters/ResponseCorrelatorFactory.cpp
    activemq/activemq-cpp/trunk/src/test/activemq/transport/filters/ResponseCorrelatorTest.cpp
    activemq/activemq-cpp/trunk/src/test/activemq/transport/filters/ResponseCorrelatorTest.h

Modified: activemq/activemq-cpp/trunk/src/examples/consumers/SimpleAsyncConsumer.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/examples/consumers/SimpleAsyncConsumer.cpp?rev=686632&r1=686631&r2=686632&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/examples/consumers/SimpleAsyncConsumer.cpp (original)
+++ activemq/activemq-cpp/trunk/src/examples/consumers/SimpleAsyncConsumer.cpp Sun Aug 17 11:57:34 2008
@@ -211,11 +211,11 @@
     //
     std::string brokerURI =
         "tcp://127.0.0.1:61616"
-        "?wireFormat=openwire"
-        "&transport.useAsyncSend=true"
+        "?wireFormat=openwire";
+//        "&transport.useAsyncSend=true"
 //        "&transport.commandTracingEnabled=true"
 //        "&transport.tcpTracingEnabled=true";
-        "&wireFormat.tightEncodingEnabled=true";
+//        "&wireFormat.tightEncodingEnabled=true";
 
     //============================================================
     // This is the Destination Name and URI options.  Use this to

Modified: activemq/activemq-cpp/trunk/src/examples/producers/SimpleProducer.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/examples/producers/SimpleProducer.cpp?rev=686632&r1=686631&r2=686632&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/examples/producers/SimpleProducer.cpp (original)
+++ activemq/activemq-cpp/trunk/src/examples/producers/SimpleProducer.cpp Sun Aug 17 11:57:34 2008
@@ -190,15 +190,16 @@
     std::string brokerURI =
         "tcp://127.0.0.1:61616"
         "?wireFormat=openwire"
-        "&transport.useAsyncSend=true"
+        "&soKeepAlive=true";
+//        "&transport.useAsyncSend=true"
 //        "&transport.commandTracingEnabled=true"
 //        "&transport.tcpTracingEnabled=true";
-        "&wireFormat.tightEncodingEnabled=true";
+//        "&wireFormat.tightEncodingEnabled=true";
 
     //============================================================
     // Total number of messages for this producer to send.
     //============================================================
-    unsigned int numMessages = 2000;
+    unsigned int numMessages = 50000;
 
     //============================================================
     // This is the Destination Name and URI options.  Use this to

Modified: activemq/activemq-cpp/trunk/src/main/activemq/connector/ProducerInfo.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/connector/ProducerInfo.h?rev=686632&r1=686631&r2=686632&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/connector/ProducerInfo.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/connector/ProducerInfo.h Sun Aug 17 11:57:34 2008
@@ -87,6 +87,21 @@
          */
         virtual bool isDisableMessageId() const = 0;
 
+        /**
+         * Gets the set send timeout for messages from this producer, a value of
+         * zero indicates that the Producer should wait forever for a response from
+         * the broker on the send.
+         * @return default time to wait for broker response to a message being sent.
+         */
+        virtual unsigned int getSendTimeout() const = 0;
+
+        /**
+         * Sets the time to wait for the broker to respond to a message being sent
+         * @param timeout - The time to wait for the broker to acknowledge that a
+         * message has been received.
+         */
+        virtual void setSendTimeout( unsigned int timeout ) = 0;
+
     };
 
 }}

Modified: activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireConnector.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireConnector.cpp?rev=686632&r1=686631&r2=686632&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireConnector.cpp (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireConnector.cpp Sun Aug 17 11:57:34 2008
@@ -655,12 +655,14 @@
 
         producer = new OpenWireProducerInfo( this );
         producer->setSessionInfo( session );
+        producer->setSendTimeout( this->getSendTimeout() );
 
         producerInfo = new commands::ProducerInfo();
         producer->setProducerInfo( producerInfo );
 
         commands::ProducerId* producerId = new commands::ProducerId();
         producerInfo->setProducerId( producerId );
+        producerInfo->setWindowSize( this->getProducerWindowSize( ));
 
         producerId->setConnectionId( session->getConnectionId() );
         producerId->setSessionId( session->getSessionId() );
@@ -853,7 +855,7 @@
         }
 
         // Send the message to the broker.
-        Response* response = syncRequest( amqMessage );
+        Response* response = syncRequest( amqMessage, producerInfo->getSendTimeout() );
 
         // The broker did not return an error - this is good.
         // Just discard the response.
@@ -1013,6 +1015,7 @@
         transaction->setTransactionInfo( info );
 
         return transaction;
+
     } catch( ConnectorException& ex ){
         try{ transport->close(); } catch( ... ){}
         ex.setMark(__FILE__,__LINE__);
@@ -1054,6 +1057,7 @@
         info->setType( (int)TRANSACTION_STATE_COMMITONEPHASE );
 
         oneway( info );
+
     } catch( ConnectorException& ex ){
         try{ transport->close(); } catch( ... ){}
         ex.setMark(__FILE__,__LINE__);
@@ -1095,6 +1099,7 @@
         info->setType( (int)TRANSACTION_STATE_ROLLBACK );
 
         oneway( info );
+
     } catch( ConnectorException& ex ){
         try{ transport->close(); } catch( ... ){}
         ex.setMark(__FILE__,__LINE__);
@@ -1183,7 +1188,7 @@
         rsi->setClientId( connectionInfo.getClientId() );
 
         // Send the message to the broker.
-        Response* response = syncRequest(rsi);
+        Response* response = syncRequest( rsi );
 
         // The broker did not return an error - this is good.
         // Just discard the response.
@@ -1225,7 +1230,6 @@
                  consumer->getConsumerInfo()->getDestination()->cloneDataStructure() );
              messagePull.setTimeout( timeout );
 
-             // TODO - This should be Async
              this->oneway( &messagePull );
          }
     }
@@ -1422,12 +1426,18 @@
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-Response* OpenWireConnector::syncRequest( Command* command )
+Response* OpenWireConnector::syncRequest( Command* command, unsigned int timeout )
     throw ( ConnectorException ) {
 
     try {
 
-        Response* response = transport->request( command );
+        Response* response = NULL;
+
+        if( timeout == 0 ) {
+            response = transport->request( command );
+        } else {
+            response = transport->request( command, timeout );
+        }
 
         commands::ExceptionResponse* exceptionResponse =
             dynamic_cast<commands::ExceptionResponse*>( response );

Modified: activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireConnector.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireConnector.h?rev=686632&r1=686631&r2=686632&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireConnector.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireConnector.h Sun Aug 17 11:57:34 2008
@@ -34,6 +34,7 @@
 #include <cms/TextMessage.h>
 #include <cms/MapMessage.h>
 
+#include <decaf/lang/Integer.h>
 #include <decaf/lang/exceptions/InvalidStateException.h>
 #include <decaf/lang/exceptions/IllegalArgumentException.h>
 
@@ -370,7 +371,7 @@
          */
         virtual ProducerInfo* createProducer(
             const cms::Destination* destination,
-            SessionInfo* session)
+            SessionInfo* session )
                 throw ( ConnectorException );
 
         /**
@@ -605,6 +606,24 @@
 
     private:
 
+        // Gets the configured producer window size to use when creating new
+        // producers to control how much memory is used.
+        virtual unsigned int getProducerWindowSize() const {
+            return decaf::lang::Integer::parseInt(
+                properties.getProperty(
+                    core::ActiveMQConstants::toString(
+                        core::ActiveMQConstants::PARAM_PRODUCERWINDOWSIZE ), "0" ) );
+        }
+
+        // Gets the time to wait for a producer send to complete, meaning the time to
+        // wait for a response.  Zero indicates to wait forever.
+        virtual unsigned int getSendTimeout() const {
+            return decaf::lang::Integer::parseInt(
+                properties.getProperty(
+                    core::ActiveMQConstants::toString(
+                        core::ActiveMQConstants::PARAM_SENDTIMEOUT ), "0" ) );
+        }
+
         // Check for Connected State and Throw an exception if not.
         void enforceConnected() throw ( ConnectorException );
 
@@ -620,11 +639,12 @@
          * Sends a synchronous request and returns the response from the broker.
          * Converts any error responses into an exception.
          * @param command The request command.
+         * @param timeout The time to wait for a response, default is zero or infinite.
          * @returns The response sent from the broker.
          * @throws ConnectorException thrown if an error response was received
          * from the broker, or if any other error occurred.
          */
-        transport::Response* syncRequest( transport::Command* command )
+        transport::Response* syncRequest( transport::Command* command, unsigned int timeout = 0 )
             throw (ConnectorException);
 
         /**

Modified: activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireFormatNegotiator.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireFormatNegotiator.cpp?rev=686632&r1=686631&r2=686632&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireFormatNegotiator.cpp (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireFormatNegotiator.cpp Sun Aug 17 11:57:34 2008
@@ -109,6 +109,35 @@
 }
 
 ////////////////////////////////////////////////////////////////////////////////
+Response* OpenWireFormatNegotiator::request( Command* command, unsigned int timeout )
+    throw( CommandIOException, UnsupportedOperationException ) {
+
+    try{
+
+        if( closed || next == NULL ){
+            throw CommandIOException(
+                __FILE__, __LINE__,
+                "OpenWireFormatNegotiator::request - transport already closed" );
+        }
+
+        if( !readyCountDownLatch.await( negotiationTimeout ) ) {
+            throw CommandIOException(
+                __FILE__,
+                __LINE__,
+                "OpenWireFormatNegotiator::request"
+                "Wire format negotiation timeout: peer did not "
+                "send his wire format." );
+        }
+
+        return next->request( command, timeout );
+    }
+    AMQ_CATCH_RETHROW( UnsupportedOperationException )
+    AMQ_CATCH_RETHROW( CommandIOException )
+    AMQ_CATCH_EXCEPTION_CONVERT( exceptions::ActiveMQException, CommandIOException )
+    AMQ_CATCHALL_THROW( CommandIOException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
 void OpenWireFormatNegotiator::onCommand( Command* command ) {
 
     DataStructure* dataStructure =

Modified: activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireFormatNegotiator.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireFormatNegotiator.h?rev=686632&r1=686631&r2=686632&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireFormatNegotiator.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireFormatNegotiator.h Sun Aug 17 11:57:34 2008
@@ -101,6 +101,19 @@
                    decaf::lang::exceptions::UnsupportedOperationException );
 
         /**
+         * Sends the given request to the server and waits for the response.
+         * First waits for the WireFormatInfo exchange to happen so that we
+         * know how to encode outbound data.
+         * @param command The request to send.
+         * @param timeout The time to wait for the response.
+         * @return the response from the server.
+         * @throws CommandIOException if an error occurs with the request.
+         */
+        virtual transport::Response* request( transport::Command* command, unsigned int timeout )
+            throw( transport::CommandIOException,
+                   decaf::lang::exceptions::UnsupportedOperationException );
+
+        /**
          * This is called in the context of the nested transport's
          * reading thread.  In the case of a response object,
          * updates the request map and notifies those waiting on the

Modified: activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireProducerInfo.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireProducerInfo.h?rev=686632&r1=686631&r2=686632&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireProducerInfo.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireProducerInfo.h Sun Aug 17 11:57:34 2008
@@ -39,6 +39,9 @@
         // Session that this producer is attached to - we do not own this
         const SessionInfo* session;
 
+        // Send timeout, how long to wait for a response before failing.
+        unsigned int sendTimeout;
+
     public:
 
         OpenWireProducerInfo( Connector* connector ) :
@@ -47,11 +50,12 @@
             this->disableMessageIds = false;
             this->producerInfo = NULL;
             this->session = NULL;
+            this->sendTimeout = 0;
         }
 
-        virtual ~OpenWireProducerInfo() { 
+        virtual ~OpenWireProducerInfo() {
             this->close();
-            delete producerInfo; 
+            delete producerInfo;
         }
 
         /**
@@ -158,6 +162,25 @@
             return this->disableMessageIds;
         }
 
+        /**
+         * Gets the set send timeout for messages from this producer, a value of
+         * zero indicates that the Producer should wait forever for a response from
+         * the broker on the send.
+         * @return default time to wait for broker response to a message being sent.
+         */
+        virtual unsigned int getSendTimeout() const {
+            return this->sendTimeout;
+        }
+
+        /**
+         * Sets the time to wait for the broker to respond to a message being sent
+         * @param timeout - The time to wait for the broker to acknowledge that a
+         * message has been received.
+         */
+        virtual void setSendTimeout( unsigned int timeout ) {
+            this->sendTimeout = timeout;
+        }
+
     };
 
 }}}

Modified: activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/StompConnectionNegotiator.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/StompConnectionNegotiator.cpp?rev=686632&r1=686631&r2=686632&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/StompConnectionNegotiator.cpp (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/StompConnectionNegotiator.cpp Sun Aug 17 11:57:34 2008
@@ -127,6 +127,57 @@
 }
 
 ////////////////////////////////////////////////////////////////////////////////
+Response* StompConnectionNegotiator::request( Command* command, unsigned int timeout )
+    throw( CommandIOException, decaf::lang::exceptions::UnsupportedOperationException ) {
+
+    try{
+
+        if( closed || next == NULL ){
+            throw CommandIOException(
+                __FILE__, __LINE__,
+                "StompConnectionNegotiator::request - transport already closed" );
+        }
+
+        // Once connected we just pass through all requests.
+        if( connected ) {
+            return next->request( command, timeout );
+        } else {
+
+            ConnectCommand* connect = dynamic_cast<ConnectCommand*>( command );
+
+            if( connect == NULL ) {
+                throw CommandIOException(
+                    __FILE__,
+                    __LINE__,
+                    "StompConnectionNegotiator::request"
+                    "Invalid Command Received: only a connect command "
+                    "can be sent before connected." );
+            }
+
+            // Send the connect request
+            next->oneway( command );
+
+            if( !readyCountDownLatch.await( negotiationTimeout ) ) {
+                throw CommandIOException(
+                    __FILE__,
+                    __LINE__,
+                    "StompConnectionNegotiator::request"
+                    "Connection Negotiate timeout: peer did not "
+                    "send a connected command." );
+            }
+
+            // return the connected command
+            return connectedCmd;
+        }
+    }
+    AMQ_CATCH_RETHROW( UnsupportedOperationException )
+    AMQ_CATCH_RETHROW( CommandIOException )
+    AMQ_CATCH_EXCEPTION_CONVERT( ActiveMQException, CommandIOException )
+    AMQ_CATCH_EXCEPTION_CONVERT( Exception, CommandIOException )
+    AMQ_CATCHALL_THROW( CommandIOException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
 void StompConnectionNegotiator::onCommand( Command* command ) {
 
     ConnectedCommand* response =

Modified: activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/StompConnectionNegotiator.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/StompConnectionNegotiator.h?rev=686632&r1=686631&r2=686632&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/StompConnectionNegotiator.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/StompConnectionNegotiator.h Sun Aug 17 11:57:34 2008
@@ -177,6 +177,19 @@
                    decaf::lang::exceptions::UnsupportedOperationException );
 
         /**
+         * Sends the given request to the server and waits for the response.
+         * First waits for the WireFormatInfo exchange to happen so that we
+         * know how to encode outbound data.
+         * @param command The request to send.
+         * @param timeout The time to wait for a response.
+         * @return the response from the server.
+         * @throws CommandIOException if an error occurs with the request.
+         */
+        virtual transport::Response* request( transport::Command* command, unsigned int timeout )
+            throw( transport::CommandIOException,
+                   decaf::lang::exceptions::UnsupportedOperationException );
+
+        /**
          * This is called in the context of the nested transport's
          * reading thread.  In the case of a response object,
          * updates the request map and notifies those waiting on the

Modified: activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/StompConnector.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/StompConnector.cpp?rev=686632&r1=686631&r2=686632&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/StompConnector.cpp (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/StompConnector.cpp Sun Aug 17 11:57:34 2008
@@ -345,6 +345,7 @@
         producer->setDestination( destination );
         producer->setProducerId( producerIds.getNextSequenceId() );
         producer->setSessionInfo( session );
+        producer->setSendTimeout( 0 );
 
         return producer;
     }

Modified: activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/StompConnector.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/StompConnector.h?rev=686632&r1=686631&r2=686632&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/StompConnector.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/StompConnector.h Sun Aug 17 11:57:34 2008
@@ -283,6 +283,7 @@
          * Create a Consumer for the given Session
          * @param destination Destination to Subscribe to.
          * @param session Session Information.
+         * @param sendTimeout The time to wait for an Ack from the Broker, zero = forever.
          * @return Producer Information
          * @throws ConnectorException
          */
@@ -363,7 +364,7 @@
         virtual void acknowledge( const SessionInfo* session,
                                   const ConsumerInfo* consumer,
                                   const cms::Message* message,
-                                  AckType ackType = ACK_TYPE_CONSUMED)
+                                  AckType ackType = ACK_TYPE_CONSUMED )
             throw ( ConnectorException );
 
         /**

Modified: activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/StompProducerInfo.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/StompProducerInfo.h?rev=686632&r1=686631&r2=686632&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/StompProducerInfo.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/StompProducerInfo.h Sun Aug 17 11:57:34 2008
@@ -41,6 +41,9 @@
         // Session that this producer is attached to - we do not own this
         const SessionInfo* session;
 
+        // Send timeout, how long to wait for a response before failing.
+        unsigned int sendTimeout;
+
     public:
 
         StompProducerInfo() : ProducerInfo() {
@@ -49,6 +52,7 @@
             this->disableMessageIds = false;
             this->session = NULL;
             this->destination = NULL;
+            this->sendTimeout = 0;
         }
 
         StompProducerInfo( Connector* connector ) :
@@ -58,6 +62,7 @@
             this->disableMessageIds = false;
             this->session = NULL;
             this->destination = NULL;
+            this->sendTimeout = 0;
         }
 
         virtual ~StompProducerInfo(void) {
@@ -139,6 +144,25 @@
             return this->disableMessageIds;
         }
 
+        /**
+         * Gets the set send timeout for messages from this producer, a value of
+         * zero indicates that the Producer should wait forever for a response from
+         * the broker on the send.
+         * @return default time to wait for broker response to a message being sent.
+         */
+        virtual unsigned int getSendTimeout() const {
+            return this->sendTimeout;
+        }
+
+        /**
+         * Sets the time to wait for the broker to respond to a message being sent
+         * @param timeout - The time to wait for the broker to acknowledge that a
+         * message has been received.
+         */
+        virtual void setSendTimeout( unsigned int timeout ) {
+            this->sendTimeout = timeout;
+        }
+
     };
 
 }}}

Modified: activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConstants.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConstants.cpp?rev=686632&r1=686631&r2=686632&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConstants.cpp (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConstants.cpp Sun Aug 17 11:57:34 2008
@@ -14,7 +14,7 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
- 
+
 #include "ActiveMQConstants.h"
 #include <stdio.h>
 
@@ -26,16 +26,16 @@
 string ActiveMQConstants::StaticInitializer::destOptions[NUM_OPTIONS];
 string ActiveMQConstants::StaticInitializer::uriParams[NUM_PARAMS];
 
-map< std::string, ActiveMQConstants::DestinationOption > 
+map< std::string, ActiveMQConstants::DestinationOption >
     ActiveMQConstants::StaticInitializer::destOptionMap;
-map< std::string, ActiveMQConstants::URIParam > 
+map< std::string, ActiveMQConstants::URIParam >
     ActiveMQConstants::StaticInitializer::uriParamsMap;
 
 ActiveMQConstants::StaticInitializer ActiveMQConstants::staticInits;
 
 ////////////////////////////////////////////////////////////////////////////////
 ActiveMQConstants::StaticInitializer::StaticInitializer(){
-    
+
     destOptions[CONSUMER_PREFECTCHSIZE] = "consumer.prefetchSize";
     destOptions[CUNSUMER_MAXPENDINGMSGLIMIT] = "consumer.maximumPendingMessageLimit";
     destOptions[CONSUMER_NOLOCAL] = "consumer.noLocal";
@@ -44,10 +44,12 @@
     destOptions[CONSUMER_SELECTOR] = "consumer.selector";
     destOptions[CONSUMER_EXCLUSIVE] = "consumer.exclusive";
     destOptions[CONSUMER_PRIORITY] = "consumer.priority";
-    
+
+    uriParams[PARAM_SENDTIMEOUT] = "connection.sendTimeout";
+    uriParams[PARAM_PRODUCERWINDOWSIZE] = "connection.producerWidowSize";
     uriParams[PARAM_USERNAME] = "username";
     uriParams[PARAM_PASSWORD] = "password";
-    uriParams[PARAM_CLIENTID] = "client-id";    
+    uriParams[PARAM_CLIENTID] = "client-id";
 
     for( int ix=0; ix<NUM_OPTIONS; ++ix ){
         destOptionMap[destOptions[ix]] = (DestinationOption)ix;

Modified: activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConstants.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConstants.h?rev=686632&r1=686631&r2=686632&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConstants.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConstants.h Sun Aug 17 11:57:34 2008
@@ -54,6 +54,8 @@
          */
         enum URIParam
         {
+            PARAM_SENDTIMEOUT,
+            PARAM_PRODUCERWINDOWSIZE,
             PARAM_USERNAME,
             PARAM_PASSWORD,
             PARAM_CLIENTID,

Modified: activemq/activemq-cpp/trunk/src/main/activemq/transport/IOTransport.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/transport/IOTransport.cpp?rev=686632&r1=686631&r2=686632&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/transport/IOTransport.cpp (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/transport/IOTransport.cpp Sun Aug 17 11:57:34 2008
@@ -221,3 +221,12 @@
         __FILE__, __LINE__,
         "IOTransport::request() - unsupported operation" );
 }
+
+////////////////////////////////////////////////////////////////////////////////
+Response* IOTransport::request( Command* command AMQCPP_UNUSED, unsigned int timeout AMQCPP_UNUSED )
+    throw( CommandIOException, decaf::lang::exceptions::UnsupportedOperationException ){
+
+    throw decaf::lang::exceptions::UnsupportedOperationException(
+        __FILE__, __LINE__,
+        "IOTransport::request() - unsupported operation" );
+}

Modified: activemq/activemq-cpp/trunk/src/main/activemq/transport/IOTransport.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/transport/IOTransport.h?rev=686632&r1=686631&r2=686632&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/transport/IOTransport.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/transport/IOTransport.h Sun Aug 17 11:57:34 2008
@@ -159,6 +159,16 @@
             throw( CommandIOException, decaf::lang::exceptions::UnsupportedOperationException );
 
         /**
+         * Not supported by this class - throws an exception.
+         * @param command the command to be sent.
+         * @param timeout the time to wait for a response.
+         * @returns the response to the command sent.
+         * @throws UnsupportedOperationException.
+         */
+        virtual Response* request( Command* command, unsigned int timeout )
+            throw( CommandIOException, decaf::lang::exceptions::UnsupportedOperationException );
+
+        /**
          * Assigns the command listener for non-response commands.
          * @param listener the listener.
          */

Modified: activemq/activemq-cpp/trunk/src/main/activemq/transport/MockTransport.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/transport/MockTransport.cpp?rev=686632&r1=686631&r2=686632&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/transport/MockTransport.cpp (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/transport/MockTransport.cpp Sun Aug 17 11:57:34 2008
@@ -57,7 +57,7 @@
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-unsigned int MockTransport::getNextCommandId() 
+unsigned int MockTransport::getNextCommandId()
     throw ( activemq::exceptions::ActiveMQException ) {
 
     try{
@@ -127,3 +127,18 @@
     AMQ_CATCH_EXCEPTION_CONVERT( Exception, CommandIOException )
     AMQ_CATCHALL_THROW( CommandIOException )
 }
+
+////////////////////////////////////////////////////////////////////////////////
+Response* MockTransport::request( Command* command, unsigned int timeout AMQCPP_UNUSED )
+    throw( CommandIOException,
+           decaf::lang::exceptions::UnsupportedOperationException)
+{
+    try{
+        this->request( command );
+    }
+    AMQ_CATCH_RETHROW( CommandIOException )
+    AMQ_CATCH_RETHROW( UnsupportedOperationException )
+    AMQ_CATCH_EXCEPTION_CONVERT( ActiveMQException, CommandIOException )
+    AMQ_CATCH_EXCEPTION_CONVERT( Exception, CommandIOException )
+    AMQ_CATCHALL_THROW( CommandIOException )
+}

Modified: activemq/activemq-cpp/trunk/src/main/activemq/transport/MockTransport.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/transport/MockTransport.h?rev=686632&r1=686631&r2=686632&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/transport/MockTransport.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/transport/MockTransport.h Sun Aug 17 11:57:34 2008
@@ -213,6 +213,11 @@
             throw( CommandIOException,
                    decaf::lang::exceptions::UnsupportedOperationException);
 
+
+        virtual Response* request( Command* command, unsigned int timeout )
+            throw( CommandIOException,
+                   decaf::lang::exceptions::UnsupportedOperationException);
+
         virtual void setCommandListener( CommandListener* listener ){
             this->commandListener = listener;
         }

Modified: activemq/activemq-cpp/trunk/src/main/activemq/transport/Transport.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/transport/Transport.h?rev=686632&r1=686631&r2=686632&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/transport/Transport.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/transport/Transport.h Sun Aug 17 11:57:34 2008
@@ -81,6 +81,20 @@
                     decaf::lang::exceptions::UnsupportedOperationException ) = 0;
 
         /**
+         * Sends the given command to the broker and then waits for the response.
+         * @param command - The command to be sent.
+         * @param timeout - The time to wait for this response.
+         * @return the response from the broker.
+         * @throws CommandIOException if an exception occurs during the read of the
+         * command.
+         * @throws UnsupportedOperationException if this method is not implemented
+         * by this transport.
+         */
+        virtual Response* request( Command* command, unsigned int timeout )
+            throw( CommandIOException,
+                    decaf::lang::exceptions::UnsupportedOperationException ) = 0;
+
+        /**
          * Assigns the command listener for non-response commands.
          * @param listener the listener.
          */

Modified: activemq/activemq-cpp/trunk/src/main/activemq/transport/TransportFilter.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/transport/TransportFilter.h?rev=686632&r1=686631&r2=686632&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/transport/TransportFilter.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/transport/TransportFilter.h Sun Aug 17 11:57:34 2008
@@ -143,6 +143,19 @@
         }
 
         /**
+         * Not supported by this class - throws an exception.
+         * @param command - The command that is sent as a request
+         * @param timeout - The the time to wait for a response.
+         * @throws CommandIOException
+         * @throws UnsupportedOperationException.
+         */
+        virtual Response* request( Command* command, unsigned int timeout )
+            throw( CommandIOException, decaf::lang::exceptions::UnsupportedOperationException ){
+
+            return next->request( command, timeout );
+        }
+
+        /**
          * Assigns the command listener for non-response commands.
          * @param listener the listener.
          */

Modified: activemq/activemq-cpp/trunk/src/main/activemq/transport/filters/LoggingTransport.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/transport/filters/LoggingTransport.cpp?rev=686632&r1=686631&r2=686632&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/transport/filters/LoggingTransport.cpp (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/transport/filters/LoggingTransport.cpp Sun Aug 17 11:57:34 2008
@@ -93,3 +93,28 @@
     AMQ_CATCH_EXCEPTION_CONVERT( Exception, CommandIOException )
     AMQ_CATCHALL_THROW( CommandIOException )
 }
+
+////////////////////////////////////////////////////////////////////////////////
+Response* LoggingTransport::request( Command* command, unsigned int timeout )
+    throw(CommandIOException, decaf::lang::exceptions::UnsupportedOperationException) {
+
+    try {
+
+        // Delegate to the base class.
+        Response* response = TransportFilter::request( command, timeout );
+
+        ostringstream ostream;
+        ostream << "*** SENDING REQUEST COMMAND: Timeout = " << timeout << " ***" << endl;
+        ostream << command->toString() << endl;
+        ostream << "*** RECEIVED RESPONSE COMMAND ***" << endl;
+        ostream << ( response == NULL? "NULL" : response->toString() );
+
+        LOGDECAF_INFO( logger, ostream.str() );
+
+        return response;
+    }
+    AMQ_CATCH_RETHROW( CommandIOException )
+    AMQ_CATCH_RETHROW( UnsupportedOperationException )
+    AMQ_CATCH_EXCEPTION_CONVERT( Exception, CommandIOException )
+    AMQ_CATCHALL_THROW( CommandIOException )
+}

Modified: activemq/activemq-cpp/trunk/src/main/activemq/transport/filters/LoggingTransport.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/transport/filters/LoggingTransport.h?rev=686632&r1=686631&r2=686632&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/transport/filters/LoggingTransport.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/transport/filters/LoggingTransport.h Sun Aug 17 11:57:34 2008
@@ -73,6 +73,16 @@
             throw( CommandIOException,
                    decaf::lang::exceptions::UnsupportedOperationException);
 
+        /**
+         * Not supported by this class - throws an exception.
+         * @param command the command that is sent as a request
+         * @param timeout the time to wait for a response.
+         * @throws UnsupportedOperationException.
+         */
+        virtual Response* request( Command* command, unsigned int timeout )
+            throw( CommandIOException,
+                   decaf::lang::exceptions::UnsupportedOperationException);
+
     };
 
 }}}

Modified: activemq/activemq-cpp/trunk/src/main/activemq/transport/filters/ResponseCorrelator.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/transport/filters/ResponseCorrelator.cpp?rev=686632&r1=686631&r2=686632&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/transport/filters/ResponseCorrelator.cpp (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/transport/filters/ResponseCorrelator.cpp Sun Aug 17 11:57:34 2008
@@ -46,13 +46,9 @@
 ////////////////////////////////////////////////////////////////////////////////
 ResponseCorrelator::ResponseCorrelator( Transport* next, bool own )
 :
-    TransportFilter( next, own )
-{
-    nextCommandId = 0;
-
-    // Default max response wait time to 3 seconds.
-    maxResponseWaitTime = 3000;
+    TransportFilter( next, own ) {
 
+    nextCommandId = 0;
     // Start in the closed state.
     closed = true;
 }
@@ -68,16 +64,6 @@
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-unsigned long ResponseCorrelator::getMaxResponseWaitTime() const{
-    return maxResponseWaitTime;
-}
-
-////////////////////////////////////////////////////////////////////////////////
-void ResponseCorrelator::setMaxResponseWaitTime( const unsigned long milliseconds ){
-    maxResponseWaitTime = milliseconds;
-}
-
-////////////////////////////////////////////////////////////////////////////////
 void ResponseCorrelator::oneway( Command* command )
     throw( CommandIOException, decaf::lang::exceptions::UnsupportedOperationException ) {
 
@@ -123,7 +109,64 @@
         next->oneway( command );
 
         // Get the response.
-        response = futureResponse->getResponse( maxResponseWaitTime );
+        response = futureResponse->getResponse();
+
+        // Perform cleanup on the map.
+        synchronized( &mapMutex ){
+
+            // We've done our waiting - get this thing out
+            // of the map.
+            requestMap.erase( command->getCommandId() );
+
+            // Destroy the futureResponse.  It is safe to
+            // do this now because the other thread only
+            // accesses the futureResponse within a lock on
+            // the map.
+            delete futureResponse;
+            futureResponse = NULL;
+        }
+
+        if( response == NULL ){
+
+            throw CommandIOException( __FILE__, __LINE__,
+                "No valid response received for command: %s, check broker.",
+                command->toString().c_str() );
+        }
+
+        return response;
+    }
+    AMQ_CATCH_RETHROW( UnsupportedOperationException )
+    AMQ_CATCH_RETHROW( CommandIOException )
+    AMQ_CATCH_EXCEPTION_CONVERT( ActiveMQException, CommandIOException )
+    AMQ_CATCH_EXCEPTION_CONVERT( Exception, CommandIOException )
+    AMQ_CATCHALL_THROW( CommandIOException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+Response* ResponseCorrelator::request( Command* command, unsigned int timeout )
+    throw( CommandIOException, decaf::lang::exceptions::UnsupportedOperationException ) {
+
+    try{
+        command->setCommandId( getNextCommandId() );
+        command->setResponseRequired( true );
+
+        // Add a future response object to the map indexed by this
+        // command id.
+        FutureResponse* futureResponse = new FutureResponse();
+
+        synchronized( &mapMutex ){
+            requestMap[command->getCommandId()] = futureResponse;
+        }
+
+        // Wait to be notified of the response via the futureResponse
+        // object.
+        Response* response = NULL;
+
+        // Send the request.
+        next->oneway( command );
+
+        // Get the response.
+        response = futureResponse->getResponse( timeout );
 
         // Perform cleanup on the map.
         synchronized( &mapMutex ){
@@ -235,6 +278,14 @@
 
     try{
 
+        // Wake-up any outstanding requests.
+        synchronized( &mapMutex ){
+            std::map<unsigned int, FutureResponse*>::iterator iter = requestMap.begin();
+            for( ; iter != requestMap.end(); ++iter ){
+                iter->second->setResponse( NULL );
+            }
+        }
+
         if( !closed && next != NULL ){
             next->close();
         }
@@ -245,3 +296,20 @@
     AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
     AMQ_CATCHALL_THROW( ActiveMQException )
 }
+
+////////////////////////////////////////////////////////////////////////////////
+void ResponseCorrelator::onTransportException(
+    Transport* source AMQCPP_UNUSED,
+    const decaf::lang::Exception& ex ) {
+
+    // Trigger each outstanding request to complete so that we don't hang
+    // forever waiting for one that has been sent without timeout.
+    synchronized( &mapMutex ){
+        std::map<unsigned int, FutureResponse*>::iterator iter = requestMap.begin();
+        for( ; iter != requestMap.end(); ++iter ){
+            iter->second->setResponse( NULL );
+        }
+    }
+
+    fire( ex );
+}

Modified: activemq/activemq-cpp/trunk/src/main/activemq/transport/filters/ResponseCorrelator.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/transport/filters/ResponseCorrelator.h?rev=686632&r1=686631&r2=686632&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/transport/filters/ResponseCorrelator.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/transport/filters/ResponseCorrelator.h Sun Aug 17 11:57:34 2008
@@ -51,11 +51,6 @@
         std::map<unsigned int, FutureResponse*> requestMap;
 
         /**
-         * Maximum amount of time in milliseconds to wait for a response.
-         */
-        unsigned long maxResponseWaitTime;
-
-        /**
          * Sync object for accessing the next command id variable.
          */
         decaf::util::concurrent::Mutex commandIdMutex;
@@ -89,18 +84,6 @@
         virtual ~ResponseCorrelator();
 
         /**
-         * Gets the maximum wait time for a response in milliseconds.
-         * @return max time that a response can take
-         */
-        virtual unsigned long getMaxResponseWaitTime() const;
-
-        /**
-         * Sets the maximum wait time for a response in milliseconds.
-         * @param milliseconds the max time that a response can take.
-         */
-        virtual void setMaxResponseWaitTime( const unsigned long milliseconds );
-
-        /**
          * Sends a one-way command.  Does not wait for any response from the
          * broker.
          * @param command the command to be sent.
@@ -124,6 +107,17 @@
                    decaf::lang::exceptions::UnsupportedOperationException );
 
         /**
+         * Sends the given request to the server and waits for the response.
+         * @param command The request to send.
+         * @param timeout The time to wait for a response.
+         * @return the response from the server.
+         * @throws CommandIOException if an error occurs with the request.
+         */
+        virtual Response* request( Command* command, unsigned int timeout )
+            throw( CommandIOException,
+                   decaf::lang::exceptions::UnsupportedOperationException );
+
+        /**
          * This is called in the context of the nested transport's
          * reading thread.  In the case of a response object,
          * updates the request map and notifies those waiting on the
@@ -152,6 +146,14 @@
          */
         virtual void close() throw( cms::CMSException );
 
+        /**
+         * Event handler for an exception from a command transport.
+         * @param source The source of the exception
+         * @param ex The exception.
+         */
+        virtual void onTransportException( Transport* source,
+                                           const decaf::lang::Exception& ex );
+
     };
 
 }}}

Modified: activemq/activemq-cpp/trunk/src/main/activemq/transport/filters/ResponseCorrelatorFactory.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/transport/filters/ResponseCorrelatorFactory.cpp?rev=686632&r1=686631&r2=686632&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/transport/filters/ResponseCorrelatorFactory.cpp (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/transport/filters/ResponseCorrelatorFactory.cpp Sun Aug 17 11:57:34 2008
@@ -47,12 +47,6 @@
     try {
 
         ResponseCorrelator* transport = new ResponseCorrelator( next, own );
-
-        transport->setMaxResponseWaitTime(
-            (unsigned long)Long::parseLong(
-                properties.getProperty(
-                    "transport.ResponseCorrelator.maxResponseWaitTime", "3000" ) ) );
-
         return transport;
     }
     AMQ_CATCH_RETHROW( ActiveMQException )

Modified: activemq/activemq-cpp/trunk/src/test/activemq/transport/filters/ResponseCorrelatorTest.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/test/activemq/transport/filters/ResponseCorrelatorTest.cpp?rev=686632&r1=686631&r2=686632&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/test/activemq/transport/filters/ResponseCorrelatorTest.cpp (original)
+++ activemq/activemq-cpp/trunk/src/test/activemq/transport/filters/ResponseCorrelatorTest.cpp Sun Aug 17 11:57:34 2008
@@ -133,7 +133,7 @@
         // Send one request.
         MyCommand cmd;
         try{
-            correlator.request( &cmd );
+            correlator.request( &cmd, 500 );
             CPPUNIT_ASSERT(false);
         }catch( CommandIOException& ex ){
             // Expected.

Modified: activemq/activemq-cpp/trunk/src/test/activemq/transport/filters/ResponseCorrelatorTest.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/test/activemq/transport/filters/ResponseCorrelatorTest.h?rev=686632&r1=686631&r2=686632&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/test/activemq/transport/filters/ResponseCorrelatorTest.h (original)
+++ activemq/activemq-cpp/trunk/src/test/activemq/transport/filters/ResponseCorrelatorTest.h Sun Aug 17 11:57:34 2008
@@ -165,6 +165,16 @@
                     "stuff" );
             }
 
+            virtual Response* request( Command* command AMQCPP_UNUSED,
+                                       unsigned int timeout AMQCPP_UNUSED )
+                throw(CommandIOException, decaf::lang::exceptions::UnsupportedOperationException)
+            {
+                throw decaf::lang::exceptions::UnsupportedOperationException(
+                    __FILE__,
+                    __LINE__,
+                    "stuff" );
+            }
+
             virtual void setCommandListener( CommandListener* listener ){
                 this->listener = listener;
             }



Mime
View raw message