activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r508808 - in /activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq: connector/ connector/openwire/ connector/stomp/ core/
Date Sat, 17 Feb 2007 20:56:21 GMT
Author: tabish
Date: Sat Feb 17 12:56:20 2007
New Revision: 508808

URL: http://svn.apache.org/viewvc?view=rev&rev=508808
Log:
http://issues.apache.org/activemq/browse/AMQCPP-30

Adding Message Send

Modified:
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/ProducerInfo.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireConnector.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireConnector.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireProducerInfo.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompProducerInfo.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQProducer.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQProducer.h

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/ProducerInfo.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/ProducerInfo.h?view=diff&rev=508808&r1=508807&r2=508808
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/ProducerInfo.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/ProducerInfo.h Sat
Feb 17 12:56:20 2007
@@ -29,7 +29,7 @@
     {
     public:
 
-   	    virtual ~ProducerInfo(void) {}
+        virtual ~ProducerInfo(void) {}
 
         /**
          * Retrieves the default destination that this producer
@@ -67,6 +67,20 @@
          * @param session SessionnInfo pointer
          */
         virtual void setSessionInfo( const SessionInfo* session ) = 0;
+
+        /**
+         * Sets if Message's Produced by this Producer should disable the
+         * use of the MessageId field.
+         * @param value - true if message ids are disabled
+         */
+        virtual void setDisableMessageId( bool value ) = 0;
+
+        /**
+         * Gets if Message's Produced by this Producer should disable the
+         * use of the MessageId field.
+         * @returns true if message ids are disabled
+         */
+        virtual bool isDisableMessageId() const = 0;
 
     };
 

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireConnector.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireConnector.cpp?view=diff&rev=508808&r1=508807&r2=508808
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireConnector.cpp
(original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireConnector.cpp
Sat Feb 17 12:56:20 2007
@@ -86,6 +86,7 @@
     this->brokerWireFormatInfo = NULL;
     this->nextConsumerId = 1;
     this->nextProducerId = 1;
+    this->nextProducerSequenceId = 1;
     this->nextTransactionId = 1;
     this->nextSessionId = 1;
     this->nextTempDestinationId = 1;
@@ -146,6 +147,17 @@
 }
 
 ////////////////////////////////////////////////////////////////////////////////
+long long OpenWireConnector::getNextProducerSequenceId()
+{
+    synchronized( &mutex )
+    {
+        return nextProducerSequenceId++;
+    }
+
+    return 0;
+}
+
+////////////////////////////////////////////////////////////////////////////////
 long long OpenWireConnector::getNextTransactionId()
 {
     synchronized( &mutex )
@@ -682,39 +694,69 @@
     {
         enforceConnected();
 
-        // TODO
+        OpenWireProducerInfo* producer =
+            dynamic_cast<OpenWireProducerInfo*>( producerInfo );
+
+        if( producer == NULL ) {
+            throw OpenWireConnectorException(
+                __FILE__, __LINE__,
+                "OpenWireConnector::send - "
+                "Producer was not of the OpenWire flavor.");
+        }
+
+        const SessionInfo* session = producerInfo->getSessionInfo();
+        commands::Message* amqMessage =
+            dynamic_cast< commands::Message* >( message );
+
+        if( amqMessage == NULL )
+        {
+            throw OpenWireConnectorException(
+                __FILE__, __LINE__,
+                "OpenWireConnector::send - "
+                "Message is not a valid Open Wire type.");
+        }
+
+        if( !producer->isDisableMessageId() )
+        {
+            commands::MessageId* id = new commands::MessageId();
+            id->setProducerId(
+                dynamic_cast<commands::ProducerId*>(
+                    producer->getProducerInfo()->getProducerId()->cloneDataStructure()
) );
+
+            id->setProducerSequenceId( getNextProducerSequenceId() );
 
-//        const SessionInfo* session = producerInfo->getSessionInfo();
-//        Command* command = dynamic_cast< transport::Command* >( message );
-//
-//        if( command == NULL )
-//        {
-//            throw StompConnectorException(
-//                __FILE__, __LINE__,
-//                "StompConnector::send - "
-//                "Message is not a valid stomp type.");
-//        }
-//
-//        if( session->getAckMode() == cms::Session::SESSION_TRANSACTED )
-//        {
-//            StompCommand* stompCommand =
-//                dynamic_cast< StompCommand* >( message );
-//
-//            if( stompCommand == NULL )
-//            {
-//                throw StompConnectorException(
-//                    __FILE__, __LINE__,
-//                    "StompConnector::send - "
-//                    "Message is not a valid stomp type.");
-//            }
-//
-//            stompCommand->setTransactionId(
-//                Integer::toString(
-//                    session->getTransactionInfo()->getTransactionId() ) );
-//        }
-//
-//        // Send it
-//        transport->oneway( command );
+            amqMessage->setMessageId( id );
+        }
+
+        amqMessage->setProducerId(
+            dynamic_cast<commands::ProducerId*>(
+                producer->getProducerInfo()->getProducerId()->cloneDataStructure()
) );
+
+        if( session->getAckMode() == cms::Session::SESSION_TRANSACTED ) {
+
+            const OpenWireTransactionInfo* transactionInfo =
+                dynamic_cast<const OpenWireTransactionInfo*>(
+                    producer->getSessionInfo()->getTransactionInfo() );
+
+            if( transactionInfo == NULL ) {
+                throw OpenWireConnectorException(
+                    __FILE__, __LINE__,
+                    "OpenWireConnector::acknowledge - "
+                    "Transacted Session, has no Transaction Info.");
+            }
+
+            amqMessage->setTransactionId(
+                dynamic_cast<commands::TransactionId*>(
+                    transactionInfo->getTransactionInfo()->
+                        getTransactionId()->cloneDataStructure() ) );
+        }
+
+        // Send the message to the broker.
+        Response* response = syncRequest( amqMessage );
+
+        // The broker did not return an error - this is good.
+        // Just discard the response.
+        delete response;
     }
     catch( CommandIOException& ex ){
         transport->close();
@@ -999,24 +1041,24 @@
     throw ( ConnectorException, UnsupportedOperationException )
 {
     commands::RemoveSubscriptionInfo* rsi = NULL;
-    
+
     try
     {
         enforceConnected();
-        
+
         rsi = new commands::RemoveSubscriptionInfo();
         rsi->setConnectionId( dynamic_cast<commands::ConnectionId*>(
             connectionInfo.getConnectionId()->cloneDataStructure()) );
         rsi->setSubcriptionName(name);
         rsi->setClientId(connectionInfo.getClientId());
-                
+
         // Send the message to the broker.
         Response* response = syncRequest(rsi);
 
         // The broker did not return an error - this is good.
         // Just discard the response.
         delete response;
-        
+
     } catch( ConnectorException& ex ) {
         delete rsi;
 

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireConnector.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireConnector.h?view=diff&rev=508808&r1=508807&r2=508808
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireConnector.h
(original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireConnector.h
Sat Feb 17 12:56:20 2007
@@ -158,6 +158,11 @@
         long long nextProducerId;
 
         /**
+         * Next avaliable Producer Sequence Id
+         */
+        long long nextProducerSequenceId;
+
+        /**
          * Next avaliable Consumer Id
          */
         long long nextConsumerId;
@@ -569,6 +574,7 @@
         long long getNextTransactionId();
         long long getNextSessionId();
         long long getNextTempDestinationId();
+        long long getNextProducerSequenceId();
 
         // Check for Connected State and Throw an exception if not.
         void enforceConnected() throw ( ConnectorException );

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireProducerInfo.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireProducerInfo.h?view=diff&rev=508808&r1=508807&r2=508808
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireProducerInfo.h
(original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireProducerInfo.h
Sat Feb 17 12:56:20 2007
@@ -30,6 +30,9 @@
 
     private:
 
+        // Are Message Ids Disabled
+        bool disableMessageIds;
+
         // OpenWire ProducerInfo Command
         commands::ProducerInfo* producerInfo;
 
@@ -39,7 +42,9 @@
     public:
 
         OpenWireProducerInfo() {
-            session = NULL;
+            this->disableMessageIds = false;
+            this->producerInfo = NULL;
+            this->session = NULL;
         }
 
         virtual ~OpenWireProducerInfo() {}
@@ -128,6 +133,24 @@
          */
         virtual void setProducerInfo( commands::ProducerInfo* producerInfo ) {
             this->producerInfo = producerInfo;
+        }
+
+        /**
+         * Sets if Message's Produced by this Producer should disable the
+         * use of the MessageId field.
+         * @param value - true if message ids are disabled
+         */
+        virtual void setDisableMessageId( bool value ) {
+            this->disableMessageIds = value;
+        }
+
+        /**
+         * Gets if Message's Produced by this Producer should disable the
+         * use of the MessageId field.
+         * @returns true if message ids are disabled
+         */
+        virtual bool isDisableMessageId() const {
+            return this->disableMessageIds;
         }
 
     };

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompProducerInfo.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompProducerInfo.h?view=diff&rev=508808&r1=508807&r2=508808
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompProducerInfo.h
(original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompProducerInfo.h
Sat Feb 17 12:56:20 2007
@@ -29,6 +29,9 @@
     {
     private:
 
+        // Are Message Ids Disabled
+        bool disableMessageIds;
+
         // Producer Id
         long long producerId;
 
@@ -41,9 +44,10 @@
     public:
 
         StompProducerInfo(void) {
-            producerId = 0;
-            session = NULL;
-            destination = NULL;
+            this->producerId = 0;
+            this->disableMessageIds = false;
+            this->session = NULL;
+            this->destination = NULL;
         }
         virtual ~StompProducerInfo(void) { delete destination; }
 
@@ -94,6 +98,24 @@
          */
         virtual void setSessionInfo( const SessionInfo* session ) {
             this->session = session;
+        }
+
+        /**
+         * Sets if Message's Produced by this Producer should disable the
+         * use of the MessageId field.
+         * @param value - true if message ids are disabled
+         */
+        virtual void setDisableMessageId( bool value ) {
+            this->disableMessageIds = value;
+        }
+
+        /**
+         * Gets if Message's Produced by this Producer should disable the
+         * use of the MessageId field.
+         * @returns true if message ids are disabled
+         */
+        virtual bool isDisableMessageId() const {
+            return this->disableMessageIds;
         }
 
     };

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQProducer.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQProducer.cpp?view=diff&rev=508808&r1=508807&r2=508808
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQProducer.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQProducer.cpp Sat
Feb 17 12:56:20 2007
@@ -46,7 +46,6 @@
 
     // Default the Delivery options
     this->defaultDeliveryMode = cms::DeliveryMode::PERSISTENT;
-    this->disableMsgId        = false;
     this->disableTimestamps   = false;
     this->defaultPriority     = 4;
     this->defaultTimeToLive   = 0;

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQProducer.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQProducer.h?view=diff&rev=508808&r1=508807&r2=508808
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQProducer.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQProducer.h Sat
Feb 17 12:56:20 2007
@@ -34,28 +34,25 @@
                              public ActiveMQSessionResource
     {
     private:
-   
-        // Disable the Message Id
-        bool disableMsgId;
-      
+
         // Disable sending timestamps
         bool disableTimestamps;
-        
+
         // The default delivery Mode of this Producer
         int defaultDeliveryMode;
-      
+
         // The default priority Level to send at
         int defaultPriority;
 
         // The default time to live value for messages in milliseconds
         long long defaultTimeToLive;
-      
+
         // Session that this producer sends to.
         ActiveMQSession* session;
-      
+
         // This Producers protocal specific info object
         connector::ProducerInfo* producerInfo;
-        
+
         // Boolean that indicates if the consumer has been closed
         bool closed;
 
@@ -84,7 +81,7 @@
          * @throws CMSException
          */
         virtual void send( cms::Message* message ) throw ( cms::CMSException );
-      
+
         /**
          * Sends the message to the default producer destination, but does
          * not take ownership of the message, caller must still destroy it.
@@ -95,9 +92,9 @@
          * milliseconds.
          * @throws CMSException
          */
-        virtual void send( cms::Message* message, int deliveryMode, int priority, 
+        virtual void send( cms::Message* message, int deliveryMode, int priority,
             long long timeToLive) throw ( cms::CMSException );
-            
+
         /**
          * Sends the message to the designated destination.
          * @param a Message Object Pointer
@@ -116,41 +113,47 @@
          * @param timeToLive The time to live value for this message in
          * milliseconds.
          * @throws CMSException
-         */     
+         */
         virtual void send( const cms::Destination* destination,
-            cms::Message* message, int deliveryMode, int priority, 
+            cms::Message* message, int deliveryMode, int priority,
             long long timeToLive) throw ( cms::CMSException );
-            
-        /** 
+
+        /**
          * Sets the delivery mode for this Producer
          * @param The DeliveryMode
          */
         virtual void setDeliveryMode( int mode ) {
-            defaultDeliveryMode = mode; 
+            defaultDeliveryMode = mode;
         }
-      
-        /** 
+
+        /**
          * Gets the delivery mode for this Producer
          * @return The DeliveryMode
          */
         virtual int getDeliveryMode() const {
-            return defaultDeliveryMode; 
+            return defaultDeliveryMode;
         }
-      
+
         /**
          * Sets if Message Ids are disbled for this Producer
          * @param boolean indicating enable / disable (true / false)
          */
         virtual void setDisableMessageId( bool value ) {
-            disableMsgId = value; 
+            if( producerInfo != NULL ){
+                producerInfo->setDisableMessageId( value );
+            }
         }
-      
+
         /**
          * Sets if Message Ids are disbled for this Producer
          * @param boolean indicating enable / disable (true / false)
          */
         virtual bool getDisableMessageId() const {
-            return disableMsgId;
+            if( this->producerInfo != NULL ) {
+                return this->producerInfo->isDisableMessageId();
+            }
+
+            return false;
         }
 
         /**
@@ -160,7 +163,7 @@
         virtual void setDisableMessageTimeStamp( bool value ) {
             disableTimestamps = value;
         }
-      
+
         /**
          * Sets if Message Time Stamps are disbled for this Producer
          * @param boolean indicating enable / disable (true / false)
@@ -168,15 +171,15 @@
         virtual bool getDisableMessageTimeStamp() const {
             return disableTimestamps;
         }
-      
+
         /**
          * Sets the Priority that this Producers sends messages at
          * @param int value for Priority level
          */
         virtual void setPriority( int priority ) {
-            defaultPriority = priority; 
+            defaultPriority = priority;
         }
-      
+
         /**
          * Gets the Priority level that this producer sends messages at
          * @return int based priority level
@@ -184,7 +187,7 @@
         virtual int getPriority() const {
             return defaultPriority;
         }
-      
+
         /**
          * Sets the Time to Live that this Producers sends messages with
          * @param time The new default time to live value in milliseconds.
@@ -192,7 +195,7 @@
         virtual void setTimeToLive( long long time ) {
             defaultTimeToLive = time;
         }
-  
+
         /**
          * Gets the Time to Live that this producer sends messages with
          * @return The default time to live value in milliseconds.
@@ -200,9 +203,9 @@
         virtual long long getTimeToLive() const {
             return defaultTimeToLive;
         }
-      
+
     public:  // ActiveMQSessionResource
-    
+
         /**
          * Retrieve the Connector resource that is associated with
          * this Session resource.
@@ -213,7 +216,7 @@
         }
 
     public:
-   
+
         /**
          * Retrives this object ProducerInfo pointer
          * @return ProducerInfo pointer



Mime
View raw message