activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r710198 - in /activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire: OpenWireConnector.cpp OpenWireConnector.h
Date Mon, 03 Nov 2008 22:36:56 GMT
Author: tabish
Date: Mon Nov  3 14:36:56 2008
New Revision: 710198

URL: http://svn.apache.org/viewvc?rev=710198&view=rev
Log:
https://issues.apache.org/activemq/browse/AMQCPP-200

Revamp allocation pattern to make any leaks from exceptions or otherwise less likely to occur.

Modified:
    activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireConnector.cpp
    activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireConnector.h

Modified: activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireConnector.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireConnector.cpp?rev=710198&r1=710197&r2=710198&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireConnector.cpp
(original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireConnector.cpp
Mon Nov  3 14:36:56 2008
@@ -18,6 +18,7 @@
 #include <activemq/connector/openwire/OpenWireConnector.h>
 
 #include <typeinfo>
+#include <memory>
 #include <decaf/util/concurrent/Concurrent.h>
 #include <activemq/transport/Transport.h>
 #include <decaf/lang/exceptions/UnsupportedOperationException.h>
@@ -222,15 +223,12 @@
         connectionInfo.setConnectionId( connectionId );
 
         // Now we ping the broker and see if we get an ack / nack
-        Response* response = syncRequest( &connectionInfo );
+        syncRequest( &connectionInfo );
 
         synchronized( &mutex ) {
             // Tag us in the Connected State now.
             state = CONNECTION_STATE_CONNECTED;
         }
-
-        // Clean up the ack
-        delete response;
     }
     AMQ_CATCH_RETHROW( ConnectorException )
     AMQ_CATCH_EXCEPTION_CONVERT( Exception, OpenWireConnectorException )
@@ -279,38 +277,23 @@
         enforceConnected();
 
         // Create and initialize a new SessionInfo object
-        commands::SessionInfo* info = new commands::SessionInfo();
-        commands::SessionId* sessionId = new commands::SessionId();
+        std::auto_ptr<commands::SessionInfo> info( new commands::SessionInfo() );
+        std::auto_ptr<commands::SessionId> sessionId( new commands::SessionId() );
         sessionId->setConnectionId( connectionInfo.getConnectionId()->getValue() );
         sessionId->setValue( sessionIds.getNextSequenceId() );
-        info->setSessionId( sessionId );
+        info->setSessionId( sessionId.release() );
 
         // Create and initialize the Connector's Session Info object, this will
         // cleanup the SessionInfo command when destroyed.
-        OpenWireSessionInfo* session = new OpenWireSessionInfo( this );
-        session->setSessionInfo( info );
+        std::auto_ptr<OpenWireSessionInfo> session( new OpenWireSessionInfo( this )
);
+        session->setSessionInfo( info.release() );
         session->setAckMode( ackMode );
 
-        try{
-
-            // Send the subscription message to the broker.
-            Response* response = syncRequest( info );
-
-            // The broker did not return an error - this is good.
-            // Just discard the response.
-            delete response;
-
-            // Return the session info.
-            return session;
-
-        } catch( ConnectorException& ex ) {
-
-            // Something bad happened - free the session info object.
-            delete session;
+        // Send the subscription message to the broker.
+        syncRequest( session->getSessionInfo() );
 
-            ex.setMark(__FILE__, __LINE__);
-            throw ex;
-        }
+        // Return the session info.
+        return session.release();
     }
     AMQ_CATCH_RETHROW( ConnectorException )
     AMQ_CATCH_EXCEPTION_CONVERT( Exception, OpenWireConnectorException )
@@ -325,17 +308,12 @@
     bool noLocal )
         throw ( ConnectorException ) {
 
-    OpenWireConsumerInfo* consumer = NULL;
-    commands::ConsumerInfo* consumerInfo = NULL;
-
     try{
 
         enforceConnected();
 
-        consumer = new OpenWireConsumerInfo( this );
-        consumer->setSessionInfo( session );
-        consumerInfo = createConsumerInfo( destination, session );
-        consumer->setConsumerInfo( consumerInfo );
+        std::auto_ptr<commands::ConsumerInfo> consumerInfo(
+            createConsumerInfo( destination, session ) );
 
         consumerInfo->setSelector( selector );
         consumerInfo->setNoLocal( noLocal );
@@ -343,33 +321,22 @@
         /**
          * Override default options with uri-encoded parameters.
          */
-        applyDestinationOptions( consumerInfo );
+        applyDestinationOptions( consumerInfo.get() );
+
+        std::auto_ptr<OpenWireConsumerInfo> consumer( new OpenWireConsumerInfo( this
) );
+        consumer->setSessionInfo( session );
+        consumer->setConsumerInfo( consumerInfo.release() );
 
         synchronized( &consumerInfoMap ) {
             // Optimistically place the Consumer into the Map.
-            consumerInfoMap.setValue(
-                consumerInfo->getConsumerId()->getValue(),
-                consumer );
+            consumerInfoMap.setValue( consumer->getConsumerId(), consumer.get() );
         }
 
-        return consumer;
-
-    } catch( ConnectorException& ex ) {
-        delete consumer;
-        delete consumerInfo;
-        ex.setMark( __FILE__, __LINE__ );
-        throw ex;
-    } catch( Exception& ex ) {
-        delete consumer;
-        delete consumerInfo;
-        ex.setMark( __FILE__, __LINE__ );
-        throw OpenWireConnectorException( ex );
-    } catch( ... ) {
-        delete consumer;
-        delete consumerInfo;
-        throw OpenWireConnectorException( __FILE__, __LINE__,
-            "caught unknown exception" );
+        return consumer.release();;
     }
+    AMQ_CATCH_RETHROW( ConnectorException )
+    AMQ_CATCH_EXCEPTION_CONVERT( Exception, OpenWireConnectorException )
+    AMQ_CATCHALL_THROW( OpenWireConnectorException )
 }
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -380,18 +347,12 @@
     const std::string& selector,
     bool noLocal )
         throw ( ConnectorException ) {
-
-    OpenWireConsumerInfo* consumer = NULL;
-    commands::ConsumerInfo* consumerInfo = NULL;
 
     try{
 
         enforceConnected();
 
-        consumer = new OpenWireConsumerInfo( this );
-        consumer->setSessionInfo( session );
-        consumerInfo = createConsumerInfo( topic, session );
-        consumer->setConsumerInfo( consumerInfo );
+        auto_ptr<commands::ConsumerInfo> consumerInfo( createConsumerInfo( topic, session
) );
 
         consumerInfo->setSelector( selector );
         consumerInfo->setNoLocal( noLocal );
@@ -400,33 +361,22 @@
         /**
          * Override default options with uri-encoded parameters.
          */
-        applyDestinationOptions( consumerInfo );
+        applyDestinationOptions( consumerInfo.get() );
+
+        auto_ptr<OpenWireConsumerInfo> consumer( new OpenWireConsumerInfo( this ) );
+        consumer->setSessionInfo( session );
+        consumer->setConsumerInfo( consumerInfo.release() );
 
         synchronized( &consumerInfoMap ) {
             // Optimistically place the Consumer into the Map.
-            consumerInfoMap.setValue(
-                consumerInfo->getConsumerId()->getValue(),
-                consumer );
+            consumerInfoMap.setValue( consumer->getConsumerId(), consumer.get() );
         }
 
-        return consumer;
-
-    } catch( ConnectorException& ex ) {
-        delete consumer;
-
-        ex.setMark( __FILE__, __LINE__ );
-        throw ex;
-    } catch( Exception& ex ) {
-        delete consumer;
-
-        ex.setMark( __FILE__, __LINE__ );
-        throw OpenWireConnectorException( ex );
-    } catch( ... ) {
-        delete consumer;
-
-        throw OpenWireConnectorException( __FILE__, __LINE__,
-            "caught unknown exception" );
+        return consumer.release();
     }
+    AMQ_CATCH_RETHROW( ConnectorException )
+    AMQ_CATCH_EXCEPTION_CONVERT( Exception, OpenWireConnectorException )
+    AMQ_CATCHALL_THROW( OpenWireConnectorException )
 }
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -555,20 +505,19 @@
     connector::SessionInfo* session )
         throw ( ConnectorException ) {
 
-    commands::ConsumerInfo* consumerInfo = NULL;
-
     try{
 
         enforceConnected();
 
-        consumerInfo = new commands::ConsumerInfo();
-        commands::ConsumerId* consumerId = new commands::ConsumerId();
-        consumerInfo->setConsumerId( consumerId );
+        std::auto_ptr<commands::ConsumerInfo> consumerInfo( new commands::ConsumerInfo()
);
+        std::auto_ptr<commands::ConsumerId> consumerId( new commands::ConsumerId()
);
 
         consumerId->setConnectionId( session->getConnectionId() );
         consumerId->setSessionId( session->getSessionId() );
         consumerId->setValue( consumerIds.getNextSequenceId() );
 
+        consumerInfo->setConsumerId( consumerId.release() );
+
         // Cast the destination to an OpenWire destination, so we can
         // get all the goodies.
         const commands::ActiveMQDestination* amqDestination =
@@ -581,30 +530,11 @@
 
         consumerInfo->setDestination( amqDestination->cloneDataStructure() );
 
-        return consumerInfo;
-
-    } catch( ConnectorException& ex ) {
-        delete consumerInfo;
-
-        ex.setMark( __FILE__, __LINE__ );
-        throw ex;
-    } catch( Exception& ex ) {
-        delete consumerInfo;
-
-        ex.setMark( __FILE__, __LINE__ );
-        throw OpenWireConnectorException( ex );
-    } catch( std::exception& ex ) {
-        delete consumerInfo;
-
-        throw OpenWireConnectorException( __FILE__, __LINE__,
-            ex.what() );
-
-    } catch( ... ) {
-        delete consumerInfo;
-
-        throw OpenWireConnectorException( __FILE__, __LINE__,
-            "caught unknown exception" );
+        return consumerInfo.release();
     }
+    AMQ_CATCH_RETHROW( ConnectorException )
+    AMQ_CATCH_EXCEPTION_CONVERT( Exception, OpenWireConnectorException )
+    AMQ_CATCHALL_THROW( OpenWireConnectorException )
 }
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -634,11 +564,7 @@
         }
 
         // Send the message to the broker.
-        Response* response = syncRequest( consumerInfo->getConsumerInfo() );
-
-        // The broker did not return an error - this is good.
-        // Just discard the response.
-        delete response;
+        syncRequest( consumerInfo->getConsumerInfo() );
 
         // Tag the Consumer as started
         consumerInfo->setStarted( true );
@@ -654,28 +580,23 @@
     connector::SessionInfo* session )
         throw ( ConnectorException ) {
 
-    OpenWireProducerInfo* producer = NULL;
-    commands::ProducerInfo* producerInfo = NULL;
-
     try{
 
         enforceConnected();
 
-        producer = new OpenWireProducerInfo( this );
+        std::auto_ptr<OpenWireProducerInfo> producer( new OpenWireProducerInfo( this
) );
         producer->setSessionInfo( session );
         producer->setSendTimeout( this->getSendTimeout() );
 
-        producerInfo = new commands::ProducerInfo();
-        producer->setProducerInfo( producerInfo );
-
-        commands::ProducerId* producerId = new commands::ProducerId();
-        producerInfo->setProducerId( producerId );
-        producerInfo->setWindowSize( this->getProducerWindowSize() );
-
+        std::auto_ptr<commands::ProducerId> producerId( new commands::ProducerId()
);
         producerId->setConnectionId( session->getConnectionId() );
         producerId->setSessionId( session->getSessionId() );
         producerId->setValue( producerIds.getNextSequenceId() );
 
+        std::auto_ptr<commands::ProducerInfo> producerInfo( new commands::ProducerInfo()
);
+        producerInfo->setProducerId( producerId.release() );
+        producerInfo->setWindowSize( this->getProducerWindowSize() );
+
         // Producers are allowed to have NULL destinations.  In this case, the
         // destination is specified by the messages as they are sent.
         if( destination != NULL ) {
@@ -698,31 +619,15 @@
         }
 
         // Send the message to the broker.
-        Response* response = syncRequest(producerInfo);
+        syncRequest( producerInfo.get() );
 
-        // The broker did not return an error - this is good.
-        // Just discard the response.
-        delete response;
+        producer->setProducerInfo( producerInfo.release() );
 
-        return producer;
-
-    } catch( ConnectorException& ex ) {
-        delete producer;
-        ex.setMark( __FILE__, __LINE__ );
-        throw ex;
-    } catch( Exception& ex ) {
-        delete producer;
-        ex.setMark( __FILE__, __LINE__ );
-        throw OpenWireConnectorException( ex );
-    } catch( std::exception& ex ) {
-        delete producer;
-        throw OpenWireConnectorException( __FILE__, __LINE__,
-            ex.what() );
-    } catch( ... ) {
-        delete producer;
-        throw OpenWireConnectorException( __FILE__, __LINE__,
-            "caught unknown exception" );
+        return producer.release();
     }
+    AMQ_CATCH_RETHROW( ConnectorException )
+    AMQ_CATCH_EXCEPTION_CONVERT( Exception, OpenWireConnectorException )
+    AMQ_CATCHALL_THROW( OpenWireConnectorException )
 }
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -761,13 +666,13 @@
 
         enforceConnected();
 
-        commands::ActiveMQTempTopic* topic = new
-            commands::ActiveMQTempTopic( createTemporaryDestinationName() );
+        std::auto_ptr<commands::ActiveMQTempTopic> topic( new
+            commands::ActiveMQTempTopic( createTemporaryDestinationName() ) );
 
         // Register it with the Broker
-        this->createTemporaryDestination( topic );
+        this->createTemporaryDestination( topic.get() );
 
-        return topic;
+        return topic.release();
     }
     AMQ_CATCH_RETHROW( ConnectorException )
     AMQ_CATCH_EXCEPTION_CONVERT( Exception, OpenWireConnectorException )
@@ -783,13 +688,13 @@
 
         enforceConnected();
 
-        commands::ActiveMQTempQueue* queue = new
-            commands::ActiveMQTempQueue( createTemporaryDestinationName() );
+        std::auto_ptr<commands::ActiveMQTempQueue> queue( new
+            commands::ActiveMQTempQueue( createTemporaryDestinationName() ) );
 
         // Register it with the Broker
-        this->createTemporaryDestination( queue );
+        this->createTemporaryDestination( queue.get() );
 
-        return queue;
+        return queue.release();
     }
     AMQ_CATCH_RETHROW( ConnectorException )
     AMQ_CATCH_EXCEPTION_CONVERT( Exception, OpenWireConnectorException )
@@ -874,11 +779,8 @@
         } else {
 
             // Send the message to the broker.
-            Response* response = syncRequest( amqMessage, producerInfo->getSendTimeout()
);
+            syncRequest( amqMessage, producerInfo->getSendTimeout() );
 
-            // The broker did not return an error - this is good.
-            // Just discard the response.
-            delete response;
         }
 
     } catch( ConnectorException& ex ){
@@ -1102,27 +1004,27 @@
 
         enforceConnected();
 
-        OpenWireTransactionInfo* transaction =
-            new OpenWireTransactionInfo( this );
+        std::auto_ptr<OpenWireTransactionInfo> transaction(
+            new OpenWireTransactionInfo( this ) );
 
         // Place Transaction Data in session for later use as well as
         // the session in the Transaction Data
-        session->setTransactionInfo( transaction );
+        session->setTransactionInfo( transaction.get() );
         transaction->setSessionInfo( session );
 
         // Prepare and send the Transaction command
-        commands::TransactionInfo* info = new commands::TransactionInfo();
+        std::auto_ptr<commands::TransactionInfo> info( new commands::TransactionInfo()
);
 
         info->setConnectionId( connectionInfo.getConnectionId()->cloneDataStructure()
);
         info->setTransactionId( createLocalTransactionId() );
         info->setType( (int)TRANSACTION_STATE_BEGIN );
 
-        oneway( info );
+        oneway( info.get() );
 
         // Store for later
-        transaction->setTransactionInfo( info );
+        transaction->setTransactionInfo( info.release() );
 
-        return transaction;
+        return transaction.release();
 
     } catch( ConnectorException& ex ){
         try{ transport->close(); } catch( ... ){}
@@ -1284,40 +1186,23 @@
 void OpenWireConnector::unsubscribe( const std::string& name )
     throw ( ConnectorException, UnsupportedOperationException ) {
 
-    commands::RemoveSubscriptionInfo* rsi = NULL;
-
     try {
 
         enforceConnected();
 
-        rsi = new commands::RemoveSubscriptionInfo();
+        std::auto_ptr<commands::RemoveSubscriptionInfo> rsi(
+            new commands::RemoveSubscriptionInfo() );
+
         rsi->setConnectionId( connectionInfo.getConnectionId()->cloneDataStructure()
);
         rsi->setSubcriptionName( name );
         rsi->setClientId( connectionInfo.getClientId() );
 
         // Send the message to the broker.
-        Response* response = syncRequest( rsi );
-
-        // The broker did not return an error - this is good.
-        // Just discard the response.
-        delete response;
-
-    } catch( ConnectorException& ex ) {
-        delete rsi;
-
-        ex.setMark( __FILE__, __LINE__ );
-        throw ex;
-    } catch( Exception& ex ) {
-        delete rsi;
-
-        ex.setMark( __FILE__, __LINE__ );
-        throw OpenWireConnectorException( ex );
-    } catch( ... ) {
-        delete rsi;
-
-        throw OpenWireConnectorException( __FILE__, __LINE__,
-            "caught unknown exception" );
+        syncRequest( rsi.get() );
     }
+    AMQ_CATCH_RETHROW( ConnectorException )
+    AMQ_CATCH_EXCEPTION_CONVERT( Exception, OpenWireConnectorException )
+    AMQ_CATCHALL_THROW( OpenWireConnectorException )
 }
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -1540,21 +1425,21 @@
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-Response* OpenWireConnector::syncRequest( Command* command, unsigned int timeout )
+void OpenWireConnector::syncRequest( Command* command, unsigned int timeout )
     throw ( ConnectorException ) {
 
     try {
 
-        Response* response = NULL;
+        std::auto_ptr<Response> response;
 
         if( timeout == 0 ) {
-            response = transport->request( command );
+            response.reset( transport->request( command ) );
         } else {
-            response = transport->request( command, timeout );
+            response.reset( transport->request( command, timeout ) );
         }
 
         commands::ExceptionResponse* exceptionResponse =
-            dynamic_cast<commands::ExceptionResponse*>( response );
+            dynamic_cast<commands::ExceptionResponse*>( response.get() );
 
         if( exceptionResponse != NULL ) {
 
@@ -1564,15 +1449,9 @@
                         exceptionResponse->getException() );
             BrokerException exception( __FILE__, __LINE__, brokerError );
 
-            // Free the response command.
-            delete response;
-
             // Throw the exception.
             throw exception;
         }
-
-        // Nothing bad happened - just return the response.
-        return response;
     }
     AMQ_CATCH_RETHROW( ConnectorException )
     AMQ_CATCH_EXCEPTION_CONVERT( CommandIOException, OpenWireConnectorException )
@@ -1603,7 +1482,7 @@
     try{
         commands::RemoveInfo command;
         command.setObjectId( objectId->cloneDataStructure() );
-        delete this->syncRequest( &command, timeout );
+        this->syncRequest( &command, timeout );
     }
     AMQ_CATCH_RETHROW( ConnectorException )
     AMQ_CATCH_EXCEPTION_CONVERT( Exception, OpenWireConnectorException )
@@ -1623,11 +1502,7 @@
         command.setDestination( tempDestination->cloneDataStructure() );
 
         // Send the message to the broker.
-        Response* response = syncRequest(&command);
-
-        // The broker did not return an error - this is good.
-        // Just discard the response.
-        delete response;
+        syncRequest( &command );
 
         // Now that its setup, link it to this Connector
         tempDestination->setConnector( this );
@@ -1651,11 +1526,7 @@
             tempDestination->cloneDataStructure() );
 
         // Send the message to the broker.
-        Response* response = syncRequest(&command);
-
-        // The broker did not return an error - this is good.
-        // Just discard the response.
-        delete response;
+        syncRequest( &command );
     }
     AMQ_CATCH_RETHROW( ConnectorException )
     AMQ_CATCH_EXCEPTION_CONVERT( Exception, OpenWireConnectorException )
@@ -1680,12 +1551,12 @@
     throw ( ConnectorException ) {
 
     try{
-        commands::LocalTransactionId* id = new commands::LocalTransactionId();
+        std::auto_ptr<commands::LocalTransactionId> id( new commands::LocalTransactionId()
);
 
         id->setConnectionId( connectionInfo.getConnectionId()->cloneDataStructure()
);
         id->setValue( transactionIds.getNextSequenceId() );
 
-        return id;
+        return id.release();
     }
     AMQ_CATCH_RETHROW( ConnectorException )
     AMQ_CATCH_EXCEPTION_CONVERT( Exception, OpenWireConnectorException )

Modified: activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireConnector.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireConnector.h?rev=710198&r1=710197&r2=710198&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireConnector.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireConnector.h Mon
Nov  3 14:36:56 2008
@@ -713,11 +713,10 @@
          * Converts any error responses into an exception.
          * @param command The request command.
          * @param timeout The time to wait for a response, default is zero or infinite.
-         * @returns The response sent from the broker.
          * @throws ConnectorException thrown if an error response was received
          * from the broker, or if any other error occurred.
          */
-        transport::Response* syncRequest( transport::Command* command, unsigned int timeout
= 0 )
+        void syncRequest( transport::Command* command, unsigned int timeout = 0 )
             throw (ConnectorException);
 
         /**



Mime
View raw message