activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r508159 - in /activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire: OpenWireConnector.cpp OpenWireConnector.h
Date Thu, 15 Feb 2007 21:25:05 GMT
Author: tabish
Date: Thu Feb 15 13:25:05 2007
New Revision: 508159

URL: http://svn.apache.org/viewvc?view=rev&rev=508159
Log:
http://issues.apache.org/activemq/browse/AMQCPP-30

Modified:
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireConnector.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireConnector.h

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireConnector.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireConnector.cpp?view=diff&rev=508159&r1=508158&r2=508159
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireConnector.cpp
(original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireConnector.cpp
Thu Feb 15 13:25:05 2007
@@ -69,6 +69,7 @@
     this->messageListener = NULL;
     this->brokerInfo = NULL;
     this->brokerWireFormatInfo = NULL;
+    this->nextConsumerId = 1;
     this->nextProducerId = 1;
     this->nextTransactionId = 1;
     this->nextSessionId = 1;
@@ -107,6 +108,17 @@
 }
 
 ////////////////////////////////////////////////////////////////////////////////
+long long OpenWireConnector::getNextConsumerId()
+{
+    synchronized( &mutex )
+    {
+        return nextConsumerId++;
+    }
+
+    return 0;
+}
+
+////////////////////////////////////////////////////////////////////////////////
 long long OpenWireConnector::getNextProducerId()
 {
     synchronized( &mutex )
@@ -318,8 +330,40 @@
     {
         enforceConnected();
 
+        OpenWireConsumerInfo* consumer = new OpenWireConsumerInfo();
+
+        commands::ConsumerInfo* consumerInfo = new commands::ConsumerInfo();
+        commands::ConsumerId* consumerId = new commands::ConsumerId();
+
+        consumerId->setConnectionId( session->getConnectionId() );
+        consumerId->setSessionId( session->getSessionId() );
+        consumerId->setValue( getNextConsumerId() );
+
+        consumerInfo->setConsumerId( consumerId );
+        consumerInfo->setSelector( selector );
+
         // TODO
-        return NULL;
+//        answer.Destination = ActiveMQDestination.Transform(destination);
+//        answer.PrefetchSize = prefetchSize;
+//        answer.Priority = priority;
+//        answer.Exclusive = exclusive;
+//        answer.DispatchAsync = dispatchAsync;
+//        answer.Retroactive = retroactive;
+//
+//        // If the destination contained a URI query, then use it to set public properties
+//        // on the ConsumerInfo
+//        ActiveMQDestination amqDestination = destination as ActiveMQDestination;
+//        if (amqDestination != null && amqDestination.Options != null)
+//        {
+//                Util.URISupport.SetProperties(answer, amqDestination.Options, "consumer.");
+//        }
+//
+//        return answer;
+
+        // Store off the ConsumerInfo command
+        consumer->setConsumerInfo( consumerInfo );
+
+        return consumer;
     }
     AMQ_CATCH_RETHROW( ConnectorException )
     AMQ_CATCHALL_THROW( ConnectorException )
@@ -355,8 +399,35 @@
     {
         enforceConnected();
 
-        // TODO
-        return NULL;
+        OpenWireProducerInfo* producer = new OpenWireProducerInfo();
+
+        commands::ProducerInfo* producerInfo = new commands::ProducerInfo();
+        commands::ProducerId* producerId = new commands::ProducerId();
+
+        producerId->setConnectionId( session->getConnectionId() );
+        producerId->setSessionId( session->getSessionId() );
+        producerId->setValue( getNextProducerId() );
+
+        producerInfo->setProducerId( producerId );
+
+// TODO
+//        answer.Destination = ActiveMQDestination.Transform(destination);
+//
+//        answer.Destination = ActiveMQDestination.Transform(destination);
+//
+//        // If the destination contained a URI query, then use it to set public
+//        // properties on the ProducerInfo
+//        ActiveMQDestination amqDestination = destination as ActiveMQDestination;
+//        if (amqDestination != null && amqDestination.Options != null)
+//        {
+//                Util.URISupport.SetProperties(answer, amqDestination.Options, "producer.");
+//        }
+//
+//        return answer;
+
+        producer->setProducerInfo( producerInfo );
+
+        return producer;
     }
     AMQ_CATCH_RETHROW( ConnectorException )
     AMQ_CATCHALL_THROW( ConnectorException )

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireConnector.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireConnector.h?view=diff&rev=508159&r1=508158&r2=508159
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireConnector.h
(original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireConnector.h
Thu Feb 15 13:25:05 2007
@@ -142,10 +142,15 @@
         long long nextProducerId;
 
         /**
+         * Next avaliable Consumer Id
+         */
+        long long nextConsumerId;
+
+        /**
          * Next avaliable Transaction Id
          */
         long long nextTransactionId;
-        
+
         /**
          * Next available Session Id.
          */
@@ -534,13 +539,14 @@
 
     private:
 
+        long long getNextConsumerId();
         long long getNextProducerId();
         long long getNextTransactionId();
         long long getNextSessionId();
 
         // Check for Connected State and Throw an exception if not.
         void enforceConnected() throw ( ConnectorException );
-        
+
         /**
          * Sends a oneway message.
          * @param command The message to send.
@@ -548,7 +554,7 @@
          * if the operation fails for any reason.
          */
         void oneway( transport::Command* command ) throw (ConnectorException);
-        
+
         /**
          * Sends a synchronous request and returns the response from the broker.
          * Converts any error responses into an exception.
@@ -556,9 +562,9 @@
          * @returns The response sent from the broker.
          * @throws ConnectorException thrown if an error response was received
          * from the broker, or if any other error occurred.
-         */         
+         */
         transport::Response* syncRequest(transport::Command* command) throw (ConnectorException);
-        
+
         /**
          * Sends a message to the broker to dispose of the given resource.
          * @param objectId The ID of the resource to be released.



Mime
View raw message