activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nmitt...@apache.org
Subject svn commit: r508817 - in /activemq/activemq-cpp/trunk/activemq-cpp/src: main/activemq/connector/ main/activemq/connector/openwire/ main/activemq/connector/stomp/ main/activemq/core/ test/activemq/connector/stomp/ test/activemq/core/
Date Sat, 17 Feb 2007 22:14:01 GMT
Author: nmittler
Date: Sat Feb 17 14:14:00 2007
New Revision: 508817

URL: http://svn.apache.org/viewvc?view=rev&rev=508817
Log:
[AMQCPP-30] adding support for receiving messages in openwire

Modified:
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/ConsumerMessageListener.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireConnector.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireConnector.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompConnector.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompSessionManager.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/connector/stomp/StompConnectorTest.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/connector/stomp/StompSessionManagerTest.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ActiveMQConnectionTest.h

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/ConsumerMessageListener.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/ConsumerMessageListener.h?view=diff&rev=508817&r1=508816&r2=508817
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/ConsumerMessageListener.h
(original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/ConsumerMessageListener.h
Sat Feb 17 14:14:00 2007
@@ -36,10 +36,14 @@
          * Called to dispatch a message to a particular consumer.
          * @param consumer the target consumer of the dispatch.
          * @param msg the message to be dispatched.
+         * @param own If true, it is the responsibility of the callee
+         * to destroy the message object.  Otherwise, the callee must NOT
+         * destroy it.
          */
         virtual void onConsumerMessage( 
             ConsumerInfo* consumer,
-            core::ActiveMQMessage* msg ) = 0;
+            core::ActiveMQMessage* msg,
+            bool own = true ) = 0;
             
     };
         

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireConnector.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireConnector.cpp?view=diff&rev=508817&r1=508816&r2=508817
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireConnector.cpp
(original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireConnector.cpp
Sat Feb 17 14:14:00 2007
@@ -391,6 +391,14 @@
         // The broker did not return an error - this is good.
         // Just discard the response.
         delete response;
+        
+        // Since we've successfully registered - add this consumer to the
+        // consumer info map.
+        synchronized( &consumerInfoMap ) {
+            consumerInfoMap.setValue( 
+                consumerInfo->getConsumerId()->getValue(),
+                consumer );
+        }
 
         return consumer;
 
@@ -1094,6 +1102,13 @@
             dynamic_cast<commands::ActiveMQTempDestination*>(resource);
 
         if( consumer != NULL ) {
+            
+            // Remove this consumer from the consumer info map
+            synchronized( &consumerInfoMap ) {
+                consumerInfoMap.remove( 
+                    consumer->getConsumerInfo()->getConsumerId()->getValue() );
+            }
+            
             dataStructure = consumer->getConsumerInfo();
         } else if( producer != NULL ) {
             dataStructure = producer->getProducerInfo();
@@ -1140,17 +1155,48 @@
 
         if( dispatch != NULL ) {
 
-//            ConsumerId* consumerId = dispatch->getConsumerId();
-//            MessageConsumer consumer = (MessageConsumer) consumers[consumerId];
-//            if (consumer == null)
-//            {
-//                Tracer.Error("No such consumer active: " + consumerId);
-//            }
-//            else
-//            {
-//                ActiveMQMessage message = (ActiveMQMessage) dispatch.Message;
-//                consumer.Dispatch(message);
-//            }
+            // Due to the severe suckiness of C++, in order to cast to
+            // a type that is in a different branch of the inheritence hierarchy
+            // we have to cast to the type at the "crotch" of the branch and then
+            // we can implicitly cast up the other branch.
+            core::ActiveMQMessage* message = dynamic_cast<commands::ActiveMQMessage*>(dispatch->getMessage());
+            if( message == NULL ) {
+                message = dynamic_cast<commands::ActiveMQTextMessage*>(dispatch->getMessage());
+            }
+            if( message == NULL ) {
+                message = dynamic_cast<commands::ActiveMQBytesMessage*>(dispatch->getMessage());
+            }
+            if( message == NULL ) {
+                message = dynamic_cast<commands::ActiveMQMapMessage*>(dispatch->getMessage());
+            }
+            if( message == NULL ) {
+                delete command;
+                throw OpenWireConnectorException( __FILE__, __LINE__, 
+                    "Received unsupported dispatch message" );
+            }
+            
+            // Get the consumer info object for this consumer.
+            OpenWireConsumerInfo* info = NULL;
+            synchronized( &consumerInfoMap ) {
+                info = consumerInfoMap.getValue( dispatch->getConsumerId()->getValue()
);
+                if( info == NULL ){
+                    delete command;
+                    throw OpenWireConnectorException( __FILE__, __LINE__,
+                        "Received dispatch message for unregistered consumer" );
+                }
+            }
+                
+            try{                                
+                
+                // Callback the listener (the connection object).
+                if( messageListener != NULL ){
+                    messageListener->onConsumerMessage(
+                        info,
+                        message,
+                        false );
+                }
+                
+            }catch( ... ){/* do nothing*/}
 
             delete command;
 
@@ -1160,13 +1206,15 @@
             this->brokerInfo = brokerInfo;
         } else if( shutdownInfo != NULL ) {
 
-            if( state != DISCONNECTED ) {
-                fire( CommandIOException(
-                    __FILE__,
-                    __LINE__,
-                    "OpenWireConnector::onCommand - "
-                    "Broker closed this connection."));
-            }
+            try {
+                if( state != DISCONNECTED ) {
+                    fire( CommandIOException(
+                        __FILE__,
+                        __LINE__,
+                        "OpenWireConnector::onCommand - "
+                        "Broker closed this connection."));
+                }
+            } catch( ... ) { /* do nothing */ }
 
             delete command;
         }

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireConnector.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireConnector.h?view=diff&rev=508817&r1=508816&r2=508817
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireConnector.h
(original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireConnector.h
Sat Feb 17 14:14:00 2007
@@ -53,6 +53,7 @@
 #include <activemq/connector/openwire/OpenWireCommandWriter.h>
 #include <activemq/connector/openwire/OpenWireFormat.h>
 #include <activemq/connector/openwire/OpenWireFormatNegotiator.h>
+#include <activemq/connector/openwire/OpenWireConsumerInfo.h>
 
 #include <activemq/connector/openwire/commands/ActiveMQTempDestination.h>
 #include <activemq/connector/openwire/commands/BrokerInfo.h>
@@ -186,6 +187,12 @@
          * Properties for the connector.
          */
         util::SimpleProperties properties;
+        
+        /**
+         * Mapping of consumer IDs to their respective
+         * consumer info object.
+         */
+        util::Map< long long, OpenWireConsumerInfo* > consumerInfoMap;
 
     private:
 

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompConnector.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompConnector.h?view=diff&rev=508817&r1=508816&r2=508817
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompConnector.h
(original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompConnector.h
Sat Feb 17 14:14:00 2007
@@ -141,21 +141,6 @@
         void disconnect();
 
         /**
-         * Fires a consumer message to the observer.
-         * @param consumer the consumerinfo to use to fire with
-         * @param msg the Message object to send
-         */
-        void fire( ConsumerInfo* consumer, core::ActiveMQMessage* msg ){
-            try{
-                if( messageListener != NULL ){
-                    messageListener->onConsumerMessage(
-                        consumer,
-                        msg );
-                }
-            }catch( ... ){/* do nothing*/}
-        }
-
-        /**
          * Fires an exception event to the observing object.
          * @param ex the exception to fire
          */

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompSessionManager.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompSessionManager.cpp?view=diff&rev=508817&r1=508816&r2=508817
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompSessionManager.cpp
(original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompSessionManager.cpp
Sat Feb 17 14:14:00 2007
@@ -315,7 +315,7 @@
                 {
                     ActiveMQMessage* msg =
                         dynamic_cast< ActiveMQMessage* >( message );
-                    messageListener->onConsumerMessage( consumerInfo, msg );
+                    messageListener->onConsumerMessage( consumerInfo, msg, true );
                 }
 
                 return;
@@ -336,7 +336,7 @@
                 {
                     ActiveMQMessage* msg =
                         dynamic_cast< ActiveMQMessage* >( message->clone() );
-                    messageListener->onConsumerMessage( consumerInfo, msg );
+                    messageListener->onConsumerMessage( consumerInfo, msg, true );
                 }
             }
 

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp?view=diff&rev=508817&r1=508816&r2=508817
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp
(original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp
Sat Feb 17 14:14:00 2007
@@ -178,7 +178,8 @@
 
 ////////////////////////////////////////////////////////////////////////////////
 void ActiveMQConnection::onConsumerMessage( connector::ConsumerInfo* consumer,
-                                            core::ActiveMQMessage* message )
+                                            core::ActiveMQMessage* message,
+                                            bool own )
 {
     try
     {
@@ -206,7 +207,9 @@
                 Connector::DeliveredAck );
 
             // Delete the message here
-            delete message;
+            if( own ){
+                delete message;
+            }
 
             return;
         }

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h?view=diff&rev=508817&r1=508816&r2=508817
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.h Sat
Feb 17 14:14:00 2007
@@ -196,9 +196,14 @@
          * Called to dispatch a message to a particular consumer.
          * @param consumer the target consumer of the dispatch.
          * @param message the message to be dispatched.
+         * @param own If true, it is the responsibility of the callee
+         * to destroy the message object.  Otherwise, the callee must NOT
+         * destroy it.
+         * 
          */
         virtual void onConsumerMessage( connector::ConsumerInfo* consumer,
-                                        core::ActiveMQMessage* message );
+                                        core::ActiveMQMessage* message,
+                                        bool own );
 
     private:
 

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/connector/stomp/StompConnectorTest.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/connector/stomp/StompConnectorTest.h?view=diff&rev=508817&r1=508816&r2=508817
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/connector/stomp/StompConnectorTest.h
(original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/connector/stomp/StompConnectorTest.h
Sat Feb 17 14:14:00 2007
@@ -91,11 +91,13 @@
             virtual ~MyMessageListener(){}
 
             virtual void onConsumerMessage( ConsumerInfo* consumer,
-                core::ActiveMQMessage* msg )
+                core::ActiveMQMessage* msg,
+                bool own )
             {
                 consumers.push_back( consumer );
 
-                delete msg;
+                if( own )
+                    delete msg;
             }
         };
 

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/connector/stomp/StompSessionManagerTest.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/connector/stomp/StompSessionManagerTest.h?view=diff&rev=508817&r1=508816&r2=508817
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/connector/stomp/StompSessionManagerTest.h
(original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/connector/stomp/StompSessionManagerTest.h
Sat Feb 17 14:14:00 2007
@@ -97,11 +97,13 @@
             virtual ~MyMessageListener(){}
 
             virtual void onConsumerMessage( ConsumerInfo* consumer,
-                core::ActiveMQMessage* msg )
+                core::ActiveMQMessage* msg,
+                bool own )
             {
                 consumers.push_back( consumer );
 
-                delete msg;
+                if( own )
+                    delete msg;
             }
         };
 

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ActiveMQConnectionTest.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ActiveMQConnectionTest.h?view=diff&rev=508817&r1=508816&r2=508817
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ActiveMQConnectionTest.h
(original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/test/activemq/core/ActiveMQConnectionTest.h
Sat Feb 17 14:14:00 2007
@@ -89,7 +89,8 @@
 
             virtual void onConsumerMessage(
                 connector::ConsumerInfo* consumer,
-                core::ActiveMQMessage* msg AMQCPP_UNUSED )
+                core::ActiveMQMessage* msg AMQCPP_UNUSED,
+                bool own )
             {
                 consumers.push_back( consumer );
             }



Mime
View raw message