activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nmitt...@apache.org
Subject svn commit: r508788 - /activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireConnector.cpp
Date Sat, 17 Feb 2007 18:28:53 GMT
Author: nmittler
Date: Sat Feb 17 10:28:52 2007
New Revision: 508788

URL: http://svn.apache.org/viewvc?view=rev&rev=508788
Log:
[AMQCPP-30] adding support for creating consumers in openwire

Modified:
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireConnector.cpp

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireConnector.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireConnector.cpp?view=diff&rev=508788&r1=508787&r2=508788
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireConnector.cpp
(original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireConnector.cpp
Sat Feb 17 10:28:52 2007
@@ -350,48 +350,95 @@
     const std::string& selector,
     bool noLocal )
         throw ( ConnectorException )
-{
+{   
+    OpenWireConsumerInfo* consumer = NULL;
+    commands::ConsumerInfo* consumerInfo = NULL;
+    
+    
     try
     {
         enforceConnected();
 
-        OpenWireConsumerInfo* consumer = new OpenWireConsumerInfo();
-
-        commands::ConsumerInfo* consumerInfo = new commands::ConsumerInfo();
+        consumer = new OpenWireConsumerInfo();
+        consumerInfo = new commands::ConsumerInfo();
+        consumer->setConsumerInfo( consumerInfo );
+        
         commands::ConsumerId* consumerId = new commands::ConsumerId();
+        consumerInfo->setConsumerId( consumerId );
 
         consumerId->setConnectionId( session->getConnectionId() );
         consumerId->setSessionId( session->getSessionId() );
         consumerId->setValue( getNextConsumerId() );
-
-        consumerInfo->setConsumerId( consumerId );
         consumerInfo->setSelector( selector );
 
-        // TODO
-//        answer.Destination = ActiveMQDestination.Transform(destination);
-//        answer.PrefetchSize = prefetchSize;
-//        answer.Priority = priority;
-//        answer.Exclusive = exclusive;
-//        answer.DispatchAsync = dispatchAsync;
-//        answer.Retroactive = retroactive;
-//
-//        // If the destination contained a URI query, then use it to set public properties
-//        // on the ConsumerInfo
-//        ActiveMQDestination amqDestination = destination as ActiveMQDestination;
-//        if (amqDestination != null && amqDestination.Options != null)
-//        {
-//                Util.URISupport.SetProperties(answer, amqDestination.Options, "consumer.");
-//        }
-//
-//        return answer;
-
-        // Store off the ConsumerInfo command
-        consumer->setConsumerInfo( consumerInfo );
+        // Cast the destination to an OpenWire destination, so we can
+        // get all the goodies.
+        const commands::ActiveMQDestination* amqDestination = 
+            dynamic_cast<const commands::ActiveMQDestination*>(destination);
+        if( amqDestination == NULL ) {
+            throw ConnectorException( __FILE__, __LINE__, 
+                "Destination was either NULL or not created by this OpenWireConnector" );
+        }
+        
+        
+        consumerInfo->setDestination( dynamic_cast<commands::ActiveMQDestination*>(
+            amqDestination->cloneDataStructure()) );
+            
+        // Get any options specified in the destination and apply them to the 
+        // ConsumerInfo object.
+        const Properties& options = amqDestination->getOptions();        
+        consumerInfo->setBrowser( Boolean::parseBoolean( 
+            options.getProperty( "consumer.browser", "false" )) );
+        consumerInfo->setPrefetchSize( Integer::parseInt( 
+            options.getProperty( "consumer.prefetchSize", "0" )) );
+        consumerInfo->setMaximumPendingMessageLimit( Integer::parseInt( 
+            options.getProperty( "consumer.maximumPendingMessageLimit", "0" )) );
+        consumerInfo->setDispatchAsync( Boolean::parseBoolean( 
+            options.getProperty( "consumer.dispatchAsync", "false" )) );
+        consumerInfo->setNoLocal( Boolean::parseBoolean( 
+            options.getProperty( "consumer.noLocal", "false" )) );
+        consumerInfo->setExclusive( Boolean::parseBoolean( 
+            options.getProperty( "consumer.exclusive", "false" )) );
+        consumerInfo->setRetroactive( Boolean::parseBoolean( 
+            options.getProperty( "consumer.retroactive", "false" )) );
+        consumerInfo->setPriority( Integer::parseInt( 
+            options.getProperty( "consumer.priority", "0" )) );
+        consumerInfo->setNetworkSubscription( Boolean::parseBoolean( 
+            options.getProperty( "consumer.networkSubscription", "false" )) );
+        consumerInfo->setOptimizedAcknowledge( Boolean::parseBoolean( 
+            options.getProperty( "consumer.optimizedAcknowledge", "false" )) );
+        consumerInfo->setNoRangeAcks( Boolean::parseBoolean( 
+            options.getProperty( "consumer.noRangeAcks", "false" )) );
+        
+        // Send the message to the broker.
+        Response* response = syncRequest(consumerInfo);
+        
+        // The broker did not return an error - this is good.
+        // Just discard the response.
+        delete response;
 
         return consumer;
+        
+    } catch( ConnectorException& ex ) {
+        delete consumer;
+        delete consumerInfo;
+        
+        ex.setMark( __FILE__, __LINE__ );
+        throw ex;
+    } catch( std::exception& ex ) {
+        delete consumer;
+        delete consumerInfo;
+        
+        throw OpenWireConnectorException( __FILE__, __LINE__, 
+            ex.what() );
+            
+    } catch( ... ) {
+        delete consumer;
+        delete consumerInfo;
+        
+        throw OpenWireConnectorException( __FILE__, __LINE__, 
+            "caught unknown exception" );
     }
-    AMQ_CATCH_RETHROW( ConnectorException )
-    AMQ_CATCHALL_THROW( OpenWireConnectorException )
 }
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -446,11 +493,12 @@
             throw ConnectorException( __FILE__, __LINE__, 
                 "Destination was either NULL or not created by this OpenWireConnector" );
         }
+                
+        producerInfo->setDestination( dynamic_cast<commands::ActiveMQDestination*>(
+            amqDestination->cloneDataStructure()) );
         
         // Get any options specified in the destination and apply them to the 
         // ProducerInfo object.
-        producerInfo->setDestination( dynamic_cast<commands::ActiveMQDestination*>(
-            amqDestination->cloneDataStructure()) );
         const Properties& options = amqDestination->getOptions();
         producerInfo->setDispatchAsync( Boolean::parseBoolean( 
             options.getProperty( "producer.dispatchAsync", "false" )) );



Mime
View raw message