Return-Path: Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: (qmail 75435 invoked from network); 5 Dec 2008 20:41:48 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 5 Dec 2008 20:41:48 -0000 Received: (qmail 50377 invoked by uid 500); 5 Dec 2008 20:42:00 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 50318 invoked by uid 500); 5 Dec 2008 20:42:00 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 50309 invoked by uid 99); 5 Dec 2008 20:42:00 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 05 Dec 2008 12:42:00 -0800 X-ASF-Spam-Status: No, hits=-2000.0 required=10.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 05 Dec 2008 20:40:38 +0000 Received: by eris.apache.org (Postfix, from userid 65534) id 979392388878; Fri, 5 Dec 2008 12:41:26 -0800 (PST) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r723859 - in /activemq/activemq-cpp/trunk/src: main/activemq/connector/ main/activemq/connector/openwire/ main/activemq/connector/stomp/ main/activemq/core/ test-integration/activemq/test/openwire/ Date: Fri, 05 Dec 2008 20:41:26 -0000 To: commits@activemq.apache.org From: tabish@apache.org X-Mailer: svnmailer-1.0.8 Message-Id: <20081205204126.979392388878@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: tabish Date: Fri Dec 5 12:41:25 2008 New Revision: 723859 URL: http://svn.apache.org/viewvc?rev=723859&view=rev Log: https://issues.apache.org/activemq/browse/AMQCPP-169 Added destination remove method to the ActiveMQConnection class that calls into the connectors to attempt a remove. Additional methods can be added in the Connection class as needed for other Broker specific managment actions for the time being. Modified: activemq/activemq-cpp/trunk/src/main/activemq/connector/Connector.h activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireConnector.cpp activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireConnector.h activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/StompConnector.cpp activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/StompConnector.h activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConnection.cpp activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConnection.h activemq/activemq-cpp/trunk/src/test-integration/activemq/test/openwire/OpenwireSimpleTest.cpp activemq/activemq-cpp/trunk/src/test-integration/activemq/test/openwire/OpenwireSimpleTest.h Modified: activemq/activemq-cpp/trunk/src/main/activemq/connector/Connector.h URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/connector/Connector.h?rev=723859&r1=723858&r2=723859&view=diff ============================================================================== --- activemq/activemq-cpp/trunk/src/main/activemq/connector/Connector.h (original) +++ activemq/activemq-cpp/trunk/src/main/activemq/connector/Connector.h Fri Dec 5 12:41:25 2008 @@ -53,18 +53,22 @@ protected: // Flags the state we are in for connection to broker. - enum ConnectionState - { + enum ConnectionState { CONNECTION_STATE_DISCONNECTED, CONNECTION_STATE_ERROR, CONNECTION_STATE_CONNECTING, CONNECTION_STATE_CONNECTED }; + // Flags to be applied when sending the Destination Info Command. + enum DestinationActions { + DESTINATION_ADD_OPERATION = 0, + DESTINATION_REMOVE_OPERATION = 1 + }; + public: // Connector Types - enum AckType - { + enum AckType { ACK_TYPE_DELIVERED = 0, // Message delivered but not consumed ACK_TYPE_POISON = 1, // Message could not be processed due to // poison pill but discard anyway @@ -383,8 +387,9 @@ /** * Pulls a message from the the service provider that this Connector is * associated with. This could be because the service has a prefetch - * policy that is set to zero and therefor requires each message to + * policy that is set to zero and therefore requires each message to * be pulled from the server to the client via a poll. + * * @param info - the consumer info for the consumer to pull for * @param timeout - the time that the caller is going to wait for new messages * @throw ConnectorException if a communications error occurs @@ -393,6 +398,22 @@ virtual void pullMessage( const connector::ConsumerInfo* info, long long timeout ) throw ( ConnectorException, decaf::lang::exceptions::UnsupportedOperationException ) = 0; + /** + * Requests that the Broker remove a Destination, destroying all resources that + * have been associated with it. The Destination is removed and does not become + * valid again until a client creates a new Destination with that name again and + * sends a message that is bound to it. + * + * @param destination + * The Destination to Remove. + * + * @throw ConnectorException if a communications error occurs + * + * @throw UnsupportedOperationException if the connector can't pull + */ + virtual void destroyDestination( const cms::Destination* destination ) + throw ( ConnectorException, decaf::lang::exceptions::UnsupportedOperationException ) = 0; + }; }} Modified: activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireConnector.cpp URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireConnector.cpp?rev=723859&r1=723858&r2=723859&view=diff ============================================================================== --- activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireConnector.cpp (original) +++ activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireConnector.cpp Fri Dec 5 12:41:25 2008 @@ -1257,6 +1257,29 @@ } //////////////////////////////////////////////////////////////////////////////// +void OpenWireConnector::destroyDestination( const cms::Destination* destination ) + throw ( ConnectorException, decaf::lang::exceptions::UnsupportedOperationException ) { + + try { + + const commands::ActiveMQDestination* amqDestination = + dynamic_cast( destination ); + + commands::DestinationInfo command; + + command.setConnectionId( connectionInfo.getConnectionId()->cloneDataStructure() ); + command.setOperationType( DESTINATION_REMOVE_OPERATION ); + command.setDestination( amqDestination->cloneDataStructure() ); + + // Send the message to the broker. + syncRequest( &command ); + } + AMQ_CATCH_RETHROW( ConnectorException ) + AMQ_CATCH_EXCEPTION_CONVERT( Exception, OpenWireConnectorException ) + AMQ_CATCHALL_THROW( OpenWireConnectorException ) +} + +//////////////////////////////////////////////////////////////////////////////// void OpenWireConnector::closeResource( ConnectorResource* resource ) throw ( ConnectorException ) { @@ -1540,7 +1563,7 @@ commands::DestinationInfo command; command.setConnectionId( connectionInfo.getConnectionId()->cloneDataStructure() ); - command.setOperationType( 0 ); // 0 is add + command.setOperationType( DESTINATION_ADD_OPERATION ); command.setDestination( tempDestination->cloneDataStructure() ); // Send the message to the broker. @@ -1563,7 +1586,7 @@ commands::DestinationInfo command; command.setConnectionId( connectionInfo.getConnectionId()->cloneDataStructure() ); - command.setOperationType( 1 ); // 1 is remove + command.setOperationType( DESTINATION_REMOVE_OPERATION ); command.setDestination( tempDestination->cloneDataStructure() ); Modified: activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireConnector.h URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireConnector.h?rev=723859&r1=723858&r2=723859&view=diff ============================================================================== --- activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireConnector.h (original) +++ activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireConnector.h Fri Dec 5 12:41:25 2008 @@ -636,6 +636,22 @@ virtual void pullMessage( const connector::ConsumerInfo* info, long long timeout ) throw ( ConnectorException, decaf::lang::exceptions::UnsupportedOperationException ); + /** + * Requests that the Broker remove a Destination, destroying all resources that + * have been associated with it. The Destination is removed and does not become + * valid again until a client creates a new Destination with that name again and + * sends a message that is bound to it. + * + * @param destination + * The Destination to Remove. + * + * @throw ConnectorException if a communications error occurs + * + * @throw UnsupportedOperationException if the connector can't pull + */ + virtual void destroyDestination( const cms::Destination* destination ) + throw ( ConnectorException, decaf::lang::exceptions::UnsupportedOperationException ); + public: // transport::CommandListener /** Modified: activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/StompConnector.cpp URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/StompConnector.cpp?rev=723859&r1=723858&r2=723859&view=diff ============================================================================== --- activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/StompConnector.cpp (original) +++ activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/StompConnector.cpp Fri Dec 5 12:41:25 2008 @@ -752,6 +752,21 @@ } //////////////////////////////////////////////////////////////////////////////// +void StompConnector::destroyDestination( const cms::Destination* destination ) + throw ( ConnectorException, decaf::lang::exceptions::UnsupportedOperationException ) { + + try { + throw UnsupportedOperationException( + __FILE__, __LINE__, + "StompConnector::destroyDestination - No Stomp Support for Destroying Destinations"); + } + AMQ_CATCH_RETHROW( ConnectorException ) + AMQ_CATCH_RETHROW( UnsupportedOperationException ) + AMQ_CATCH_EXCEPTION_CONVERT( Exception, ConnectorException ) + AMQ_CATCHALL_THROW( ConnectorException ); +} + +//////////////////////////////////////////////////////////////////////////////// void StompConnector::closeResource( ConnectorResource* resource ) throw ( ConnectorException ) { Modified: activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/StompConnector.h URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/StompConnector.h?rev=723859&r1=723858&r2=723859&view=diff ============================================================================== --- activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/StompConnector.h (original) +++ activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/StompConnector.h Fri Dec 5 12:41:25 2008 @@ -105,12 +105,12 @@ StompSessionManager* sessionManager; /** - * Next avaliable Producer Id + * Next available Producer Id */ util::LongSequenceGenerator producerIds; /** - * Next avaliable Transaction Id + * Next available Transaction Id */ util::LongSequenceGenerator transactionIds; @@ -224,7 +224,7 @@ /** * Creates a Session Info object for this connector - * @param ackMode Acknowledgement Mode of the Session + * @param ackMode Acknowledgment Mode of the Session * @returns Session Info Object * @throws ConnectorException */ @@ -486,7 +486,7 @@ } /** - * Sets the Listner of exceptions for this connector + * Sets the Listener of exceptions for this connector * @param listener ExceptionListener the observer. */ virtual void setExceptionListener( @@ -507,7 +507,7 @@ /** * Pulls a message from the the service provider that this Connector is * associated with. This could be because the service has a prefetch - * policy that is set to zero and therefor requires each message to + * policy that is set to zero and therefore requires each message to * be pulled from the server to the client via a poll. * @param info - the consumer info for the consumer to pull for * @param timeout - the time that the caller is going to wait for new messages @@ -517,6 +517,22 @@ virtual void pullMessage( const connector::ConsumerInfo* info, long long timeout ) throw ( ConnectorException, decaf::lang::exceptions::UnsupportedOperationException ); + /** + * Requests that the Broker remove a Destination, destroying all resources that + * have been associated with it. The Destination is removed and does not become + * valid again until a client creates a new Destination with that name again and + * sends a message that is bound to it. + * + * @param destination + * The Destination to Remove. + * + * @throw ConnectorException if a communications error occurs + * + * @throw UnsupportedOperationException if the connector can't pull + */ + virtual void destroyDestination( const cms::Destination* destination ) + throw ( ConnectorException, decaf::lang::exceptions::UnsupportedOperationException ); + public: // transport::CommandListener /** Modified: activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConnection.cpp URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConnection.cpp?rev=723859&r1=723858&r2=723859&view=diff ============================================================================== --- activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConnection.cpp (original) +++ activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConnection.cpp Fri Dec 5 12:41:25 2008 @@ -20,19 +20,18 @@ #include #include #include -#include #include #include +using namespace std; using namespace cms; using namespace activemq; using namespace activemq::core; +using namespace activemq::connector; +using namespace activemq::exceptions; using namespace decaf::util; using namespace decaf::lang; using namespace decaf::lang::exceptions; -using namespace activemq::connector; -using namespace activemq::exceptions; -using namespace std; //////////////////////////////////////////////////////////////////////////////// ActiveMQConnection::ActiveMQConnection(ActiveMQConnectionData* connectionData) { @@ -292,3 +291,32 @@ AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException ) AMQ_CATCHALL_THROW( ActiveMQException ) } + +//////////////////////////////////////////////////////////////////////////////// +void ActiveMQConnection::destroyDestination( cms::Destination* destination ) + throw( decaf::lang::exceptions::NullPointerException, + decaf::lang::exceptions::IllegalStateException, + decaf::lang::exceptions::UnsupportedOperationException, + activemq::exceptions::ActiveMQException ) { + + try{ + + if( destination == NULL ) { + throw NullPointerException( + __FILE__, __LINE__, "Destination passed was NULL" ); + } + + if( this->isClosed() ) { + throw IllegalStateException( + __FILE__, __LINE__, "Connection Closed" ); + } + + // Ask the connector to perform a remove. + this->connectionData->getConnector()->destroyDestination( destination ); + } + AMQ_CATCH_RETHROW( NullPointerException ) + AMQ_CATCH_RETHROW( IllegalStateException ) + AMQ_CATCH_RETHROW( ActiveMQException ) + AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException ) + AMQ_CATCHALL_THROW( ActiveMQException ) +} Modified: activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConnection.h URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConnection.h?rev=723859&r1=723858&r2=723859&view=diff ============================================================================== --- activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConnection.h (original) +++ activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConnection.h Fri Dec 5 12:41:25 2008 @@ -20,6 +20,7 @@ #include #include +#include #include #include #include @@ -28,7 +29,9 @@ #include #include #include -#include +#include +#include +#include #include @@ -126,6 +129,38 @@ virtual void sendPullRequest( const connector::ConsumerInfo* consumer, long long timeout ) throw ( exceptions::ActiveMQException ); + /** + * Checks if this connection has been closed + * @return true if the connection is closed + */ + bool isClosed() const { + return this->closed; + } + + /** + * Requests that the Broker removes the given Destination. Calling this + * method implies that the client is finished with the Destination and that + * no other messages will be sent or received for the given Destination. The + * Broker frees all resources it has associated with this Destination. + * + * @param destination + * The Destination the Broker will be requested to remove. + * + * @throws NullPointerException + * If the passed Destination is Null + * @throws IllegalStateException + * If the connection is closed. + * @throws UnsupportedOperationException + * If the wire format in use does not support this operation. + * @throws ActiveMQException + * If any other error occurs during the attempt to destroy the destination. + */ + virtual void destroyDestination( cms::Destination* destination ) + throw( decaf::lang::exceptions::NullPointerException, + decaf::lang::exceptions::IllegalStateException, + decaf::lang::exceptions::UnsupportedOperationException, + activemq::exceptions::ActiveMQException ); + public: // Connection Interface Methods /** Modified: activemq/activemq-cpp/trunk/src/test-integration/activemq/test/openwire/OpenwireSimpleTest.cpp URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/test-integration/activemq/test/openwire/OpenwireSimpleTest.cpp?rev=723859&r1=723858&r2=723859&view=diff ============================================================================== --- activemq/activemq-cpp/trunk/src/test-integration/activemq/test/openwire/OpenwireSimpleTest.cpp (original) +++ activemq/activemq-cpp/trunk/src/test-integration/activemq/test/openwire/OpenwireSimpleTest.cpp Fri Dec 5 12:41:25 2008 @@ -18,6 +18,7 @@ #include "OpenwireSimpleTest.h" #include +#include #include #include @@ -25,6 +26,7 @@ using namespace std; using namespace cms; using namespace activemq; +using namespace activemq::core; using namespace activemq::test; using namespace activemq::test::openwire; using namespace activemq::util; @@ -202,3 +204,46 @@ AMQ_CATCH_RETHROW( ActiveMQException ) AMQ_CATCHALL_THROW( ActiveMQException ) } + +//////////////////////////////////////////////////////////////////////////////// +void OpenwireSimpleTest::testDestroyDestination() { + + try { + + cmsProvider->setDestinationName( "testDestroyDestination" ); + cmsProvider->reconnectSession(); + + // Create CMS Object for Comms + cms::Session* session( cmsProvider->getSession() ); + cms::MessageConsumer* consumer = cmsProvider->getConsumer(); + cms::MessageProducer* producer = cmsProvider->getProducer(); + producer->setDeliveryMode( DeliveryMode::NON_PERSISTENT ); + + auto_ptr txtMessage( session->createTextMessage( "TEST MESSAGE" ) ); + + // Send some text messages + producer->send( txtMessage.get() ); + + auto_ptr message( consumer->receive( 1000 ) ); + CPPUNIT_ASSERT( message.get() != NULL ); + + ActiveMQConnection* connection = + dynamic_cast( cmsProvider->getConnection() ); + + CPPUNIT_ASSERT( connection != NULL ); + + try{ + connection->destroyDestination( cmsProvider->getDestination() ); + CPPUNIT_ASSERT_MESSAGE( "Destination Should be in use.", false ); + } catch( ActiveMQException& ex ) { + } + + cmsProvider->reconnectSession(); + + connection->destroyDestination( cmsProvider->getDestination() ); + + } catch( ActiveMQException& ex ) { + ex.printStackTrace(); + CPPUNIT_ASSERT_MESSAGE( "CAUGHT EXCEPTION", false ); + } AMQ_CATCHALL_THROW( ActiveMQException ) +} Modified: activemq/activemq-cpp/trunk/src/test-integration/activemq/test/openwire/OpenwireSimpleTest.h URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/test-integration/activemq/test/openwire/OpenwireSimpleTest.h?rev=723859&r1=723858&r2=723859&view=diff ============================================================================== --- activemq/activemq-cpp/trunk/src/test-integration/activemq/test/openwire/OpenwireSimpleTest.h (original) +++ activemq/activemq-cpp/trunk/src/test-integration/activemq/test/openwire/OpenwireSimpleTest.h Fri Dec 5 12:41:25 2008 @@ -39,6 +39,7 @@ CPPUNIT_TEST( testWithZeroConsumerPrefetch ); CPPUNIT_TEST( testMapMessageSendToQueue ); CPPUNIT_TEST( testMapMessageSendToTopic ); + CPPUNIT_TEST( testDestroyDestination ); CPPUNIT_TEST_SUITE_END(); public: @@ -53,6 +54,7 @@ virtual void testWithZeroConsumerPrefetch(); virtual void testMapMessageSendToQueue(); virtual void testMapMessageSendToTopic(); + virtual void testDestroyDestination(); };