activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nmitt...@apache.org
Subject svn commit: r508784 - /activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireConnector.cpp
Date Sat, 17 Feb 2007 17:54:20 GMT
Author: nmittler
Date: Sat Feb 17 09:54:19 2007
New Revision: 508784

URL: http://svn.apache.org/viewvc?view=rev&rev=508784
Log:
[AMQCPP-30] adding support for creating producers in openwire

Modified:
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireConnector.cpp

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireConnector.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireConnector.cpp?view=diff&rev=508784&r1=508783&r2=508784
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireConnector.cpp
(original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireConnector.cpp
Sat Feb 17 09:54:19 2007
@@ -21,6 +21,7 @@
 #include <activemq/transport/Transport.h>
 #include <activemq/exceptions/UnsupportedOperationException.h>
 #include <activemq/util/Integer.h>
+#include <activemq/util/Boolean.h>
 #include <activemq/util/Long.h>
 #include <activemq/util/Guid.h>
 #include <activemq/connector/openwire/OpenWireConnectorException.h>
@@ -419,42 +420,70 @@
     connector::SessionInfo* session )
         throw ( ConnectorException )
 {
+    OpenWireProducerInfo* producer = NULL;
+    commands::ProducerInfo* producerInfo = NULL;
+    
     try
     {
         enforceConnected();
 
-        OpenWireProducerInfo* producer = new OpenWireProducerInfo();
-
-        commands::ProducerInfo* producerInfo = new commands::ProducerInfo();
+        producer = new OpenWireProducerInfo();
+        producerInfo = new commands::ProducerInfo();
+        producer->setProducerInfo( producerInfo );
+        
         commands::ProducerId* producerId = new commands::ProducerId();
+        producerInfo->setProducerId( producerId );
 
         producerId->setConnectionId( session->getConnectionId() );
         producerId->setSessionId( session->getSessionId() );
         producerId->setValue( getNextProducerId() );
 
-        producerInfo->setProducerId( producerId );
-
-// TODO
-//        answer.Destination = ActiveMQDestination.Transform(destination);
-//
-//        answer.Destination = ActiveMQDestination.Transform(destination);
-//
-//        // If the destination contained a URI query, then use it to set public
-//        // properties on the ProducerInfo
-//        ActiveMQDestination amqDestination = destination as ActiveMQDestination;
-//        if (amqDestination != null && amqDestination.Options != null)
-//        {
-//                Util.URISupport.SetProperties(answer, amqDestination.Options, "producer.");
-//        }
-//
-//        return answer;
-
-        producer->setProducerInfo( producerInfo );
+        // Cast the destination to an OpenWire destination, so we can
+        // get all the goodies.
+        const commands::ActiveMQDestination* amqDestination = 
+            dynamic_cast<const commands::ActiveMQDestination*>(destination);
+        if( amqDestination == NULL ) {
+            throw ConnectorException( __FILE__, __LINE__, 
+                "Destination was either NULL or not created by this OpenWireConnector" );
+        }
+        
+        // Get any options specified in the destination and apply them to the 
+        // ProducerInfo object.
+        producerInfo->setDestination( dynamic_cast<commands::ActiveMQDestination*>(
+            amqDestination->cloneDataStructure()) );
+        const Properties& options = amqDestination->getOptions();
+        producerInfo->setDispatchAsync( Boolean::parseBoolean( 
+            options.getProperty( "producer.dispatchAsync", "false" )) );
+            
+        // Send the message to the broker.
+        Response* response = syncRequest(producerInfo);
+
+        // The broker did not return an error - this is good.
+        // Just discard the response.
+        delete response;    
 
         return producer;
+        
+    } catch( ConnectorException& ex ) {
+        delete producer;
+        delete producerInfo;
+        
+        ex.setMark( __FILE__, __LINE__ );
+        throw ex;
+    } catch( std::exception& ex ) {
+        delete producer;
+        delete producerInfo;
+        
+        throw OpenWireConnectorException( __FILE__, __LINE__, 
+            ex.what() );
+            
+    } catch( ... ) {
+        delete producer;
+        delete producerInfo;
+        
+        throw OpenWireConnectorException( __FILE__, __LINE__, 
+            "caught unknown exception" );
     }
-    AMQ_CATCH_RETHROW( ConnectorException )
-    AMQ_CATCHALL_THROW( OpenWireConnectorException )
 }
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -831,6 +860,8 @@
 
         OpenWireConsumerInfo* consumer =
             dynamic_cast<OpenWireConsumerInfo*>(resource);
+        OpenWireProducerInfo* producer =
+            dynamic_cast<OpenWireProducerInfo*>(resource);
         OpenWireSessionInfo* session =
             dynamic_cast<OpenWireSessionInfo*>(resource);
         commands::ActiveMQTempDestination* tempDestination =
@@ -838,6 +869,8 @@
 
         if( consumer != NULL ) {
             dataStructure = consumer->getConsumerInfo();
+        } else if( producer != NULL ) {
+            dataStructure = producer->getProducerInfo();
         } else if( session != NULL ) {
             dataStructure = session->getSessionInfo();
         } else if( tempDestination != NULL ) {
@@ -1016,7 +1049,12 @@
             dynamic_cast<commands::ActiveMQDestination*>(
                 tempDestination->cloneDataStructure() ) );
 
-        this->syncRequest( &command );
+        // Send the message to the broker.
+        Response* response = syncRequest(&command);
+
+        // The broker did not return an error - this is good.
+        // Just discard the response.
+        delete response; 
     }
     AMQ_CATCH_RETHROW( ConnectorException )
     AMQ_CATCHALL_THROW( OpenWireConnectorException )
@@ -1037,7 +1075,12 @@
             dynamic_cast<commands::ActiveMQDestination*>(
                 tempDestination->cloneDataStructure() ) );
 
-        this->syncRequest( &command );
+        // Send the message to the broker.
+        Response* response = syncRequest(&command);
+
+        // The broker did not return an error - this is good.
+        // Just discard the response.
+        delete response; 
     }
     AMQ_CATCH_RETHROW( ConnectorException )
     AMQ_CATCHALL_THROW( OpenWireConnectorException )



Mime
View raw message