activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nmitt...@apache.org
Subject svn commit: r508794 - in /activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire: OpenWireConnector.cpp OpenWireConnector.h
Date Sat, 17 Feb 2007 19:06:51 GMT
Author: nmittler
Date: Sat Feb 17 11:06:50 2007
New Revision: 508794

URL: http://svn.apache.org/viewvc?view=rev&rev=508794
Log:
[AMQCPP-30] adding support for creating durable consumers in openwire

Modified:
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireConnector.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireConnector.h

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireConnector.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireConnector.cpp?view=diff&rev=508794&r1=508793&r2=508794
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireConnector.cpp
(original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireConnector.cpp
Sat Feb 17 11:06:50 2007
@@ -354,23 +354,108 @@
         throw ( ConnectorException )
 {
     OpenWireConsumerInfo* consumer = NULL;
-    commands::ConsumerInfo* consumerInfo = NULL;
-
+    commands::ConsumerInfo* consumerInfo = NULL;
+    
     try
     {
         enforceConnected();
 
         consumer = new OpenWireConsumerInfo();
-        consumerInfo = new commands::ConsumerInfo();
+        consumerInfo = createConsumerInfo( destination, session ); 
         consumer->setConsumerInfo( consumerInfo );
 
+        consumerInfo->setSelector( selector );
+        consumerInfo->setNoLocal( noLocal );       
+        
+        // Send the message to the broker.
+        Response* response = syncRequest(consumerInfo);
+        
+        // The broker did not return an error - this is good.
+        // Just discard the response.
+        delete response;
+
+        return consumer;
+        
+    } catch( ConnectorException& ex ) {
+        delete consumer;
+        delete consumerInfo;
+        
+        ex.setMark( __FILE__, __LINE__ );
+        throw ex;
+    } catch( ... ) {
+        delete consumer;
+        delete consumerInfo;
+        
+        throw OpenWireConnectorException( __FILE__, __LINE__, 
+            "caught unknown exception" );
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+ConsumerInfo* OpenWireConnector::createDurableConsumer(
+    const cms::Topic* topic,
+    connector::SessionInfo* session,
+    const std::string& name,
+    const std::string& selector,
+    bool noLocal )
+        throw ( ConnectorException )
+{
+    OpenWireConsumerInfo* consumer = NULL;
+    commands::ConsumerInfo* consumerInfo = NULL;
+    
+    try
+    {
+        enforceConnected();
+
+        consumer = new OpenWireConsumerInfo();
+        consumerInfo = createConsumerInfo( topic, session ); 
+        consumer->setConsumerInfo( consumerInfo );
+
+        consumerInfo->setSelector( selector );
+        consumerInfo->setNoLocal( noLocal );        
+        consumerInfo->setSubscriptionName( name );        
+        
+        // Send the message to the broker.
+        Response* response = syncRequest(consumerInfo);
+        
+        // The broker did not return an error - this is good.
+        // Just discard the response.
+        delete response;
+
+        return consumer;
+        
+    } catch( ConnectorException& ex ) {
+        delete consumer;
+        delete consumerInfo;
+        
+        ex.setMark( __FILE__, __LINE__ );
+        throw ex;
+    } catch( ... ) {
+        delete consumer;
+        delete consumerInfo;
+        
+        throw OpenWireConnectorException( __FILE__, __LINE__, 
+            "caught unknown exception" );
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+commands::ConsumerInfo* OpenWireConnector::createConsumerInfo(
+    const cms::Destination* destination,
+    connector::SessionInfo* session )
+        throw ( ConnectorException )
+{   
+    commands::ConsumerInfo* consumerInfo = NULL;
+    
+    try
+    {
+        consumerInfo = new commands::ConsumerInfo();
         commands::ConsumerId* consumerId = new commands::ConsumerId();
         consumerInfo->setConsumerId( consumerId );
 
         consumerId->setConnectionId( session->getConnectionId() );
         consumerId->setSessionId( session->getSessionId() );
         consumerId->setValue( getNextConsumerId() );
-        consumerInfo->setSelector( selector );
 
         // Cast the destination to an OpenWire destination, so we can
         // get all the goodies.
@@ -396,9 +481,7 @@
         consumerInfo->setMaximumPendingMessageLimit( Integer::parseInt(
             options.getProperty( "consumer.maximumPendingMessageLimit", "0" )) );
         consumerInfo->setDispatchAsync( Boolean::parseBoolean(
-            options.getProperty( "consumer.dispatchAsync", "false" )) );
-        consumerInfo->setNoLocal( Boolean::parseBoolean(
-            options.getProperty( "consumer.noLocal", "false" )) );
+            options.getProperty( "consumer.dispatchAsync", "false" )) );
         consumerInfo->setExclusive( Boolean::parseBoolean(
             options.getProperty( "consumer.exclusive", "false" )) );
         consumerInfo->setRetroactive( Boolean::parseBoolean(
@@ -410,57 +493,34 @@
         consumerInfo->setOptimizedAcknowledge( Boolean::parseBoolean(
             options.getProperty( "consumer.optimizedAcknowledge", "false" )) );
         consumerInfo->setNoRangeAcks( Boolean::parseBoolean(
-            options.getProperty( "consumer.noRangeAcks", "false" )) );
+            options.getProperty( "consumer.noRangeAcks", "false" )) );
 
         // Send the message to the broker.
         Response* response = syncRequest(consumerInfo);
 
         // The broker did not return an error - this is good.
         // Just discard the response.
-        delete response;
+        delete response;
 
-        return consumer;
+        return consumerInfo;
 
     } catch( ConnectorException& ex ) {
-        delete consumer;
         delete consumerInfo;
 
         ex.setMark( __FILE__, __LINE__ );
         throw ex;
     } catch( std::exception& ex ) {
-        delete consumer;
         delete consumerInfo;
 
         throw OpenWireConnectorException( __FILE__, __LINE__,
             ex.what() );
 
     } catch( ... ) {
-        delete consumer;
         delete consumerInfo;
 
         throw OpenWireConnectorException( __FILE__, __LINE__,
             "caught unknown exception" );
     }
-}
-
-////////////////////////////////////////////////////////////////////////////////
-ConsumerInfo* OpenWireConnector::createDurableConsumer(
-    const cms::Topic* topic,
-    connector::SessionInfo* session,
-    const std::string& name,
-    const std::string& selector,
-    bool noLocal )
-        throw ( ConnectorException )
-{
-    try
-    {
-        enforceConnected();
-
-        // TODO
-        return NULL;
-    }
-    AMQ_CATCH_RETHROW( ConnectorException )
-    AMQ_CATCHALL_THROW( OpenWireConnectorException )
 }
 
 ////////////////////////////////////////////////////////////////////////////////

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireConnector.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireConnector.h?view=diff&rev=508794&r1=508793&r2=508794
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireConnector.h
(original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireConnector.h
Sat Feb 17 11:06:50 2007
@@ -55,6 +55,7 @@
 #include <activemq/connector/openwire/OpenWireFormatNegotiator.h>
 
 #include <activemq/connector/openwire/commands/ConnectionInfo.h>
+#include <activemq/connector/openwire/commands/ConsumerInfo.h>
 #include <activemq/connector/openwire/commands/BrokerInfo.h>
 #include <activemq/connector/openwire/commands/WireFormatInfo.h>
 #include <activemq/connector/openwire/commands/ActiveMQTempDestination.h>
@@ -611,6 +612,16 @@
          */
         std::string createTemporaryDestinationName()
             throw ( ConnectorException );
+            
+        /**
+         * Creates a commands::ConsumerInfo object.  Used for both standard
+         * and durable consumers.
+         * @param destination The destination on which to create the consumer
+         * @param session the parent session context.
+         */
+        commands::ConsumerInfo* createConsumerInfo(
+            const cms::Destination* destination,
+            connector::SessionInfo* session ) throw ( ConnectorException );
 
     };
 



Mime
View raw message