activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r508789 - in /activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq: connector/ connector/openwire/ connector/stomp/ core/
Date Sat, 17 Feb 2007 18:46:23 GMT
Author: tabish
Date: Sat Feb 17 10:46:22 2007
New Revision: 508789

URL: http://svn.apache.org/viewvc?view=rev&rev=508789
Log:
http://issues.apache.org/activemq/browse/AMQCPP-30

Add message ack to openwire

Modified:
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/Connector.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireConnector.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireConnector.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompConnector.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompConnector.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQTransaction.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQTransaction.h

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/Connector.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/Connector.h?view=diff&rev=508789&r1=508788&r2=508789
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/Connector.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/Connector.h Sat Feb
17 10:46:22 2007
@@ -46,25 +46,25 @@
 namespace connector{
 
     // Forward declarations.
-    class Connector 
-    : 
+    class Connector
+    :
         public cms::Startable,
         public cms::Closeable
     {
     public:    // Connector Types
-    
+
         enum AckType
         {
             DeliveredAck = 0,  // Message delivered but not consumed
-            PoisonAck    = 1,  // Message could not be processed due to 
+            PoisonAck    = 1,  // Message could not be processed due to
                                // poison pill but discard anyway
-            ConsumedAck  = 2   // Message consumed, discard            
+            ConsumedAck  = 2   // Message consumed, discard
         };
-    
+
     public:
-   
-   	    virtual ~Connector(void) {};
-        
+
+           virtual ~Connector(void) {};
+
         /**
          * Gets the Client Id for this connection, if this
          * connection has been closed, then this method returns ""
@@ -78,7 +78,7 @@
          * @return Username String
          */
         virtual std::string getUsername(void) const = 0;
-        
+
         /**
          * Gets the Password for this connection, if this
          * connection has been closed, then this method returns ""
@@ -92,7 +92,7 @@
          * @return reference to a transport
          * @throws InvalidStateException if the Transport is not set
          */
-        virtual transport::Transport& getTransport(void) const 
+        virtual transport::Transport& getTransport(void) const
             throw (exceptions::InvalidStateException ) = 0;
 
         /**
@@ -102,10 +102,10 @@
          * @throws ConnectorException
          */
         virtual SessionInfo* createSession(
-            cms::Session::AcknowledgeMode ackMode ) 
+            cms::Session::AcknowledgeMode ackMode )
                 throw( ConnectorException ) = 0;
-      
-        /** 
+
+        /**
          * Create a Consumer for the given Session
          * @param destination Destination to Subscribe to.
          * @param session Session Information.
@@ -115,32 +115,32 @@
          * @throws ConnectorException
          */
         virtual ConsumerInfo* createConsumer(
-            const cms::Destination* destination, 
+            const cms::Destination* destination,
             SessionInfo* session,
             const std::string& selector = "",
             bool noLocal = false )
                 throw ( ConnectorException ) = 0;
-         
-        /** 
+
+        /**
          * Create a Durable Consumer for the given Session
          * @param topic Topic to Subscribe to.
          * @param session Session Information.
          * @param name name of the Durable Topic
          * @param selector Selector
-         * @param noLocal if set, inhibits the delivery of messages 
-         *        published by its own connection 
+         * @param noLocal if set, inhibits the delivery of messages
+         *        published by its own connection
          * @return Consumer Information
          * @throws ConnectorException
          */
         virtual ConsumerInfo* createDurableConsumer(
-            const cms::Topic* topic, 
+            const cms::Topic* topic,
             SessionInfo* session,
             const std::string& name,
             const std::string& selector = "",
             bool noLocal = false )
                 throw ( ConnectorException ) = 0;
 
-        /** 
+        /**
          * Create a Consumer for the given Session
          * @param destination Destination to Subscribe to.
          * @param session Session Information.
@@ -148,7 +148,7 @@
          * @throws ConnectorException
          */
         virtual ProducerInfo* createProducer(
-            const cms::Destination* destination, 
+            const cms::Destination* destination,
             SessionInfo* session )
                 throw ( ConnectorException ) = 0;
 
@@ -159,10 +159,10 @@
          * @return a newly created Topic Object
          * @throws ConnectorException
          */
-        virtual cms::Topic* createTopic( const std::string& name, 
+        virtual cms::Topic* createTopic( const std::string& name,
                                          SessionInfo* session )
             throw ( ConnectorException ) = 0;
-          
+
         /**
          * Creates a Queue given a name and session info
          * @param name Queue Name
@@ -170,7 +170,7 @@
          * @return a newly created Queue Object
          * @throws ConnectorException
          */
-        virtual cms::Queue* createQueue( const std::string& name, 
+        virtual cms::Queue* createQueue( const std::string& name,
                                          SessionInfo* session )
             throw ( ConnectorException ) = 0;
 
@@ -183,7 +183,7 @@
         virtual cms::TemporaryTopic* createTemporaryTopic(
             SessionInfo* session )
                 throw ( ConnectorException ) = 0;
-          
+
         /**
          * Creates a Temporary Queue given a name and session info
          * @param session Session Information
@@ -200,9 +200,9 @@
          * @param producerInfo Producer Info for the sender of this message
          * @throws ConnectorException
          */
-        virtual void send( cms::Message* message, ProducerInfo* producerInfo ) 
+        virtual void send( cms::Message* message, ProducerInfo* producerInfo )
             throw ( ConnectorException ) = 0;
-      
+
         /**
          * Sends a set of Messages
          * @param messages List of Messages to send.
@@ -210,17 +210,19 @@
          * @throws ConnectorException
          */
         virtual void send( std::list<cms::Message*>& messages,
-                           ProducerInfo* producerInfo) 
+                           ProducerInfo* producerInfo)
             throw ( ConnectorException ) = 0;
-         
+
         /**
          * Acknowledges a Message
          * @param session the Session that the message is linked to
+         * @param consumer the Consumer that the message was linked to
          * @param message An ActiveMQMessage to Ack.
          * @param ackType the type of ack to perform
          * @throws ConnectorException
          */
         virtual void acknowledge( const SessionInfo* session,
+                                  const ConsumerInfo* consumer,
                                   const cms::Message* message,
                                   AckType ackType = ConsumedAck)
             throw ( ConnectorException ) = 0;
@@ -231,16 +233,16 @@
          * @throws ConnectorException
          */
         virtual TransactionInfo* startTransaction(
-            SessionInfo* session ) 
+            SessionInfo* session )
                 throw ( ConnectorException ) = 0;
-         
+
         /**
          * Commits a Transaction.
          * @param transaction The Transaction information
          * @param session Session Information
          * @throws ConnectorException
          */
-        virtual void commit( TransactionInfo* transaction, 
+        virtual void commit( TransactionInfo* transaction,
                              SessionInfo* session )
             throw ( ConnectorException ) = 0;
 
@@ -250,7 +252,7 @@
          * @param session Session Information
          * @throws ConnectorException
          */
-        virtual void rollback( TransactionInfo* transaction, 
+        virtual void rollback( TransactionInfo* transaction,
                                SessionInfo* session )
             throw ( ConnectorException ) = 0;
 
@@ -298,7 +300,7 @@
             TransactionInfo* transaction )
                 throw ( ConnectorException ) = 0;
 
-        /** 
+        /**
          * Unsubscribe from a givenDurable Subscription
          * @param name name of the Subscription
          * @throws ConnectorException
@@ -313,15 +315,15 @@
          */
         virtual void destroyResource( ConnectorResource* resource )
             throw ( ConnectorException ) = 0;
-            
-        /** 
+
+        /**
          * Sets the listener of consumer messages.
          * @param listener the ConsumerMessageListener observer.
          */
         virtual void setConsumerMessageListener(
             ConsumerMessageListener* listener ) = 0;
 
-        /** 
+        /**
          * Sets the Listner of exceptions for this connector
          * @param listener the ExceptionListener observer.
          */

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireConnector.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireConnector.cpp?view=diff&rev=508789&r1=508788&r2=508789
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireConnector.cpp
(original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireConnector.cpp
Sat Feb 17 10:46:22 2007
@@ -39,6 +39,8 @@
 #include <activemq/connector/openwire/commands/RemoveInfo.h>
 #include <activemq/connector/openwire/commands/ShutdownInfo.h>
 #include <activemq/connector/openwire/commands/SessionInfo.h>
+#include <activemq/connector/openwire/commands/Message.h>
+#include <activemq/connector/openwire/commands/MessageAck.h>
 #include <activemq/connector/openwire/commands/MessageDispatch.h>
 #include <activemq/connector/openwire/commands/WireFormatInfo.h>
 #include <activemq/connector/openwire/commands/BrokerInfo.h>
@@ -469,7 +471,7 @@
 {
     OpenWireProducerInfo* producer = NULL;
     commands::ProducerInfo* producerInfo = NULL;
-    
+
     try
     {
         enforceConnected();
@@ -477,7 +479,7 @@
         producer = new OpenWireProducerInfo();
         producerInfo = new commands::ProducerInfo();
         producer->setProducerInfo( producerInfo );
-        
+
         commands::ProducerId* producerId = new commands::ProducerId();
         producerInfo->setProducerId( producerId );
 
@@ -487,49 +489,48 @@
 
         // Cast the destination to an OpenWire destination, so we can
         // get all the goodies.
-        const commands::ActiveMQDestination* amqDestination = 
+        const commands::ActiveMQDestination* amqDestination =
             dynamic_cast<const commands::ActiveMQDestination*>(destination);
         if( amqDestination == NULL ) {
-            throw ConnectorException( __FILE__, __LINE__, 
+            throw ConnectorException( __FILE__, __LINE__,
                 "Destination was either NULL or not created by this OpenWireConnector" );
         }
-                
+
+        // Get any options specified in the destination and apply them to the
+        // ProducerInfo object.
         producerInfo->setDestination( dynamic_cast<commands::ActiveMQDestination*>(
             amqDestination->cloneDataStructure()) );
-        
-        // Get any options specified in the destination and apply them to the 
-        // ProducerInfo object.
         const Properties& options = amqDestination->getOptions();
-        producerInfo->setDispatchAsync( Boolean::parseBoolean( 
+        producerInfo->setDispatchAsync( Boolean::parseBoolean(
             options.getProperty( "producer.dispatchAsync", "false" )) );
-            
+
         // Send the message to the broker.
         Response* response = syncRequest(producerInfo);
 
         // The broker did not return an error - this is good.
         // Just discard the response.
-        delete response;    
+        delete response;
 
         return producer;
-        
+
     } catch( ConnectorException& ex ) {
         delete producer;
         delete producerInfo;
-        
+
         ex.setMark( __FILE__, __LINE__ );
         throw ex;
     } catch( std::exception& ex ) {
         delete producer;
         delete producerInfo;
-        
-        throw OpenWireConnectorException( __FILE__, __LINE__, 
+
+        throw OpenWireConnectorException( __FILE__, __LINE__,
             ex.what() );
-            
+
     } catch( ... ) {
         delete producer;
         delete producerInfo;
-        
-        throw OpenWireConnectorException( __FILE__, __LINE__, 
+
+        throw OpenWireConnectorException( __FILE__, __LINE__,
             "caught unknown exception" );
     }
 }
@@ -679,39 +680,75 @@
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void OpenWireConnector::acknowledge( const connector::SessionInfo* session,
+void OpenWireConnector::acknowledge( const SessionInfo* session,
+                                     const ConsumerInfo* consumer,
                                      const cms::Message* message,
                                      AckType ackType = ConsumedAck )
     throw ( ConnectorException )
 {
     try {
 
-//        // Auto to Stomp means don't do anything, so we drop it here
-//        // for client acknowledge we have to send and ack.
-//        if( session->getAckMode() == cms::Session::CLIENT_ACKNOWLEDGE ||
-//            session->getAckMode() == cms::Session::SESSION_TRANSACTED )
-//        {
-//            AckCommand cmd;
-//
-//            if( message->getCMSMessageId() == "" )
-//            {
-//                throw StompConnectorException(
-//                    __FILE__, __LINE__,
-//                    "StompConnector::send - "
-//                    "Message has no Message Id, cannot ack.");
-//            }
-//
-//            cmd.setMessageId( message->getCMSMessageId() );
-//
-//            if( session->getAckMode() == cms::Session::SESSION_TRANSACTED )
-//            {
-//                cmd.setTransactionId(
-//                    Integer::toString(
-//                        session->getTransactionInfo()->getTransactionId() ) );
-//            }
-//
-//            transport->oneway( &cmd );
-//        }
+        if( session->getAckMode() == cms::Session::CLIENT_ACKNOWLEDGE ||
+            session->getAckMode() == cms::Session::SESSION_TRANSACTED ) {
+
+            const commands::Message* amqMessage =
+                dynamic_cast<const commands::Message*>( message );
+
+            if( amqMessage == NULL ) {
+                throw OpenWireConnectorException(
+                    __FILE__, __LINE__,
+                    "OpenWireConnector::acknowledge - "
+                    "Message was not a commands::Message derivation.");
+            }
+
+            const OpenWireConsumerInfo* consumerInfo =
+                dynamic_cast<const OpenWireConsumerInfo*>( consumer );
+
+            if( consumerInfo == NULL ) {
+                throw OpenWireConnectorException(
+                    __FILE__, __LINE__,
+                    "OpenWireConnector::acknowledge - "
+                    "Consumer was not of the OpenWire flavor.");
+            }
+
+            commands::MessageAck ack;
+            ack.setAckType( (int)ackType );
+            ack.setConsumerId(
+                dynamic_cast<commands::ConsumerId*>(
+                    consumerInfo->getConsumerInfo()->
+                        getConsumerId()->cloneDataStructure() ) );
+            ack.setDestination(
+                dynamic_cast<commands::ActiveMQDestination*>(
+                    amqMessage->getDestination()->cloneDataStructure() ) );
+            ack.setFirstMessageId(
+                dynamic_cast<commands::MessageId*>(
+                    amqMessage->getMessageId()->cloneDataStructure() ) );
+            ack.setLastMessageId(
+                dynamic_cast<commands::MessageId*>(
+                    amqMessage->getMessageId()->cloneDataStructure() ) );
+            ack.setMessageCount( 1 );
+
+            if( session->getAckMode() == cms::Session::SESSION_TRANSACTED ) {
+
+                const OpenWireTransactionInfo* transactionInfo =
+                    dynamic_cast<const OpenWireTransactionInfo*>(
+                        session->getTransactionInfo() );
+
+                if( transactionInfo == NULL ) {
+                    throw OpenWireConnectorException(
+                        __FILE__, __LINE__,
+                        "OpenWireConnector::acknowledge - "
+                        "Transacted Session, has no Transaction Info.");
+                }
+
+                ack.setTransactionId(
+                    dynamic_cast<commands::TransactionId*>(
+                        transactionInfo->getTransactionInfo()->
+                            getTransactionId()->cloneDataStructure() ) );
+            }
+
+            transport->oneway( &ack );
+        }
     }
     catch( CommandIOException& ex ){
         transport->close();
@@ -1102,7 +1139,7 @@
 
         // The broker did not return an error - this is good.
         // Just discard the response.
-        delete response; 
+        delete response;
     }
     AMQ_CATCH_RETHROW( ConnectorException )
     AMQ_CATCHALL_THROW( OpenWireConnectorException )
@@ -1128,7 +1165,7 @@
 
         // The broker did not return an error - this is good.
         // Just discard the response.
-        delete response; 
+        delete response;
     }
     AMQ_CATCH_RETHROW( ConnectorException )
     AMQ_CATCHALL_THROW( OpenWireConnectorException )

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireConnector.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireConnector.h?view=diff&rev=508789&r1=508788&r2=508789
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireConnector.h
(original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireConnector.h
Sat Feb 17 10:46:22 2007
@@ -406,10 +406,14 @@
 
         /**
          * Acknowledges a Message
-         * @param An ActiveMQMessage to Ack.
+         * @param session the Session that the message is linked to
+         * @param consumer the Consumer that the message was linked to
+         * @param message An ActiveMQMessage to Ack.
+         * @param ackType the type of ack to perform
          * @throws ConnectorException
          */
         virtual void acknowledge( const SessionInfo* session,
+                                  const ConsumerInfo* consumer,
                                   const cms::Message* message,
                                   AckType ackType )
             throw ( ConnectorException );

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompConnector.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompConnector.cpp?view=diff&rev=508789&r1=508788&r2=508789
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompConnector.cpp
(original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompConnector.cpp
Sat Feb 17 10:46:22 2007
@@ -268,12 +268,12 @@
 
         // Send the disconnect command to the broker.
         DisconnectCommand cmd;
-		transport->oneway( &cmd );
+        transport->oneway( &cmd );
 
     } catch( CommandIOException& ex ){
-		transport->close();
-		throw ex;
-	}
+        transport->close();
+        throw ex;
+    }
     AMQ_CATCH_RETHROW( ActiveMQException )
     AMQ_CATCHALL_THROW( ActiveMQException );
 }
@@ -455,13 +455,13 @@
         }
 
         // Send it
-		transport->oneway( command );
+        transport->oneway( command );
+    }
+    catch( CommandIOException& ex ){
+        transport->close();
+        throw ConnectorException( __FILE__, __LINE__,
+            ex.what() );
     }
-	catch( CommandIOException& ex ){
-		transport->close();
-		throw ConnectorException( __FILE__, __LINE__,
-			ex.what() );
-	}
     AMQ_CATCH_RETHROW( ConnectorException )
     AMQ_CATCHALL_THROW( ConnectorException );
 }
@@ -488,6 +488,7 @@
 
 ////////////////////////////////////////////////////////////////////////////////
 void StompConnector::acknowledge( const SessionInfo* session,
+                                  const ConsumerInfo* consumer AMQCPP_UNUSED,
                                   const cms::Message* message,
                                   AckType ackType AMQCPP_UNUSED)
     throw ( ConnectorException )
@@ -520,14 +521,14 @@
                         session->getTransactionInfo()->getTransactionId() ) );
             }
 
-			transport->oneway( &cmd );
+            transport->oneway( &cmd );
         }
     }
-	catch( CommandIOException& ex ){
-		transport->close();
-		throw ConnectorException( __FILE__, __LINE__,
-			ex.what() );
-	}
+    catch( CommandIOException& ex ){
+        transport->close();
+        throw ConnectorException( __FILE__, __LINE__,
+            ex.what() );
+    }
     AMQ_CATCH_RETHROW( ConnectorException )
     AMQ_CATCHALL_THROW( ConnectorException );
 }
@@ -552,15 +553,15 @@
         cmd.setTransactionId(
                 Integer::toString( transaction->getTransactionId() ) );
 
-		transport->oneway( &cmd );
+        transport->oneway( &cmd );
 
         return transaction;
     }
-	catch( CommandIOException& ex ){
-		transport->close();
-		throw ConnectorException( __FILE__, __LINE__,
-			ex.what() );
-	}
+    catch( CommandIOException& ex ){
+        transport->close();
+        throw ConnectorException( __FILE__, __LINE__,
+            ex.what() );
+    }
     AMQ_CATCH_RETHROW( ConnectorException )
     AMQ_CATCHALL_THROW( ConnectorException );
     return NULL;
@@ -582,11 +583,11 @@
 
         transport->oneway( &cmd );
     }
-	catch( CommandIOException& ex ){
-		transport->close();
-		throw ConnectorException( __FILE__, __LINE__,
-			ex.what() );
-	}
+    catch( CommandIOException& ex ){
+        transport->close();
+        throw ConnectorException( __FILE__, __LINE__,
+            ex.what() );
+    }
     AMQ_CATCH_RETHROW( ConnectorException )
     AMQ_CATCHALL_THROW( ConnectorException );
 }
@@ -607,11 +608,11 @@
 
         transport->oneway( &cmd );
     }
-	catch( CommandIOException& ex ){
-		transport->close();
-		throw ConnectorException( __FILE__, __LINE__,
-			ex.what() );
-	}
+    catch( CommandIOException& ex ){
+        transport->close();
+        throw ConnectorException( __FILE__, __LINE__,
+            ex.what() );
+    }
     AMQ_CATCH_RETHROW( ConnectorException )
     AMQ_CATCHALL_THROW( ConnectorException );
 }

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompConnector.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompConnector.h?view=diff&rev=508789&r1=508788&r2=508789
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompConnector.h
(original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompConnector.h
Sat Feb 17 10:46:22 2007
@@ -365,14 +365,16 @@
 
         /**
          * Acknowledges a Message
-         * @param session An ActiveMQMessage to Ack.
-         * @param message a message to acknowledge
-         * @param ackType the type of acknowledgement mode to use
+         * @param session the Session that the message is linked to
+         * @param consumer the Consumer that the message was linked to
+         * @param message An ActiveMQMessage to Ack.
+         * @param ackType the type of ack to perform
          * @throws ConnectorException
          */
         virtual void acknowledge( const SessionInfo* session,
+                                  const ConsumerInfo* consumer,
                                   const cms::Message* message,
-                                  AckType ackType )
+                                  AckType ackType = ConsumedAck)
             throw ( ConnectorException );
 
         /**

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp?view=diff&rev=508789&r1=508788&r2=508789
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp
(original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnection.cpp
Sat Feb 17 10:46:22 2007
@@ -201,6 +201,7 @@
             // was not consumed.
             connectionData->getConnector()->acknowledge(
                 consumer->getSessionInfo(),
+                consumer,
                 (Message*)message,
                 Connector::DeliveredAck );
 

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp?view=diff&rev=508789&r1=508788&r2=508789
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp Sat
Feb 17 10:46:22 2007
@@ -582,7 +582,9 @@
         // Delegate to connector to ack this message.
         return connection->getConnectionData()->
             getConnector()->acknowledge(
-                sessionInfo, dynamic_cast< cms::Message* >( message ) );
+                sessionInfo,
+                consumer->getConsumerInfo(),
+                dynamic_cast< cms::Message* >( message ) );
     }
     AMQ_CATCH_RETHROW( ActiveMQException )
     AMQ_CATCHALL_THROW( ActiveMQException )

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQTransaction.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQTransaction.cpp?view=diff&rev=508789&r1=508788&r2=508789
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQTransaction.cpp
(original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQTransaction.cpp
Sat Feb 17 10:46:22 2007
@@ -48,16 +48,16 @@
                 "ActiveMQTransaction::ActiveMQTransaction - "
                 "Initialized with a NULL connection data");
         }
-    
+
         // Store State Data
         this->connection = connection;
         this->session    = session;
         this->taskCount  = 0;
-            
+
         // convert from property Strings to int.
-        redeliveryDelay = Integer::parseInt( 
+        redeliveryDelay = Integer::parseInt(
             properties.getProperty( "transaction.redeliveryDelay", "25" ) );
-        maxRedeliveries = Integer::parseInt( 
+        maxRedeliveries = Integer::parseInt(
             properties.getProperty( "transaction.maxRedeliveryCount", "5" ) );
 
         // Start a new Transaction
@@ -80,7 +80,7 @@
 
         // Clean up
         clearTransaction();
-        
+
         // Must allow all the tasks to complete before we destruct otherwise
         // the callbacks will cause an exception.
         synchronized( &tasksDone )
@@ -88,7 +88,7 @@
             while( taskCount != 0 )
             {
                 tasksDone.wait(1000);
-                
+
                 // TODO - Log Here to get some indication if we are stuck
             }
         }
@@ -111,14 +111,14 @@
 
         synchronized( &rollbackLock )
         {
-            // If there are any messages that are being transacted, then 
+            // If there are any messages that are being transacted, then
             // they die once and for all here.
             RollbackMap::iterator itr = rollbackMap.begin();
-            
+
             for( ; itr != rollbackMap.end(); ++itr )
             {
                 MessageList::iterator msgItr = itr->second.begin();
-                
+
                 for( ; msgItr != itr->second.end(); ++msgItr )
                 {
                    delete *msgItr;
@@ -134,18 +134,18 @@
 
 ////////////////////////////////////////////////////////////////////////////////
 void ActiveMQTransaction::addToTransaction( ActiveMQMessage* message,
-                                            ActiveMQMessageListener* listener )
+                                            ActiveMQConsumer* consumer )
 {
     synchronized( &rollbackLock )
     {
         // Store in the Multi Map
-        rollbackMap[listener].push_back( message );
+        rollbackMap[consumer].push_back( message );
     }
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 void ActiveMQTransaction::removeFromTransaction(
-    ActiveMQMessageListener* listener )
+    ActiveMQConsumer* consumer )
 {
     try
     {
@@ -153,22 +153,22 @@
         // the Rollback Map.
         synchronized( &rollbackLock )
         {
-            RollbackMap::iterator rb_itr = rollbackMap.find( listener );
-            
+            RollbackMap::iterator rb_itr = rollbackMap.find( consumer );
+
             if( rb_itr == rollbackMap.end() )
             {
                 return;
             }
-            
+
             MessageList::iterator itr = rb_itr->second.begin();
-            
-            for( ; itr != rollbackMap[listener].end(); ++itr )
+
+            for( ; itr != rollbackMap[consumer].end(); ++itr )
             {
                delete *itr;
             }
-            
+
             // Erase the entry from the map
-            rollbackMap.erase( listener );
+            rollbackMap.erase( consumer );
         }
     }
     AMQ_CATCH_RETHROW( ActiveMQException )
@@ -179,7 +179,7 @@
 void ActiveMQTransaction::commit(void) throw ( exceptions::ActiveMQException )
 {
     try
-    {    
+    {
         if( this->transactionInfo == NULL )
         {
             throw InvalidStateException(
@@ -187,7 +187,7 @@
                 "ActiveMQTransaction::begin - "
                 "Commit called before transaction was started.");
         }
-        
+
         // Commit the current Transaction
         connection->getConnectionData()->getConnector()->
             commit( transactionInfo, session->getSessionInfo() );
@@ -207,7 +207,7 @@
 void ActiveMQTransaction::rollback(void) throw ( exceptions::ActiveMQException )
 {
     try
-    {    
+    {
         if( this->transactionInfo == NULL )
         {
             throw InvalidStateException(
@@ -215,7 +215,7 @@
                 "ActiveMQTransaction::rollback - "
                 "Rollback called before transaction was started.");
         }
-        
+
         // Rollback the Transaction
         connection->getConnectionData()->getConnector()->
             rollback( transactionInfo, session->getSessionInfo() );
@@ -238,7 +238,7 @@
         synchronized( &rollbackLock )
         {
             RollbackMap::iterator itr = rollbackMap.begin();
-            
+
             for(; itr != rollbackMap.end(); ++itr)
             {
                 ThreadPool::getInstance()->queueTask(make_pair(
@@ -253,7 +253,7 @@
                 taskCount++;
 
             }
-            
+
             // Clear the map.  Ownership of the messages is now handed off
             // to the rollback tasks.
             rollbackMap.clear();
@@ -270,9 +270,9 @@
     {
         // Delete the task
         delete task;
-        
+
         taskCount--;
-        
+
         if( taskCount == 0 )
         {
             synchronized( &tasksDone )
@@ -284,19 +284,19 @@
     AMQ_CATCH_NOTHROW( ActiveMQException )
     AMQ_CATCHALL_NOTHROW( )
 }
-   
+
 ////////////////////////////////////////////////////////////////////////////////
-void ActiveMQTransaction::onTaskException( Runnable* task, 
+void ActiveMQTransaction::onTaskException( Runnable* task,
                                            exceptions::ActiveMQException& ex )
 {
     try
     {
         // Delegate
         onTaskComplete( task );
-        
+
         // Route the Error
         ExceptionListener* listener = connection->getExceptionListener();
-        
+
         if( listener != NULL )
         {
             listener->onException( ex );
@@ -310,32 +310,33 @@
 void ActiveMQTransaction::RollbackTask::run(void)
 {
     try
-    {        
+    {
         MessageList::iterator itr = messages.begin();
 
         for( ; itr != messages.end(); ++itr )
         {
             ( *itr )->setRedeliveryCount( ( *itr )->getRedeliveryCount() + 1 );
-            
+
             // Redeliver Messages at some point in the future
             Thread::sleep( redeliveryDelay );
-            
+
             if( ( *itr )->getRedeliveryCount() >= maxRedeliveries )
             {
                 // Poison Ack the Message, we give up processing this one
                 connection->getConnectionData()->getConnector()->
-                    acknowledge( 
-                        session->getSessionInfo(), 
-                        dynamic_cast< Message* >( *itr ), 
+                    acknowledge(
+                        session->getSessionInfo(),
+                        consumer->getConsumerInfo(),
+                        dynamic_cast< Message* >( *itr ),
                         Connector::PoisonAck );
 
                 // Won't redeliver this so we kill it here.
                 delete *itr;
-                
+
                 return;
             }
-            
-            listener->onActiveMQMessage( *itr );
+
+            consumer->onActiveMQMessage( *itr );
         }
     }
     AMQ_CATCH_RETHROW( ActiveMQException )

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQTransaction.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQTransaction.h?view=diff&rev=508789&r1=508788&r2=508789
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQTransaction.h
(original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQTransaction.h
Sat Feb 17 10:46:22 2007
@@ -38,7 +38,7 @@
     class ActiveMQConnection;
     class ActiveMQSession;
     class ActiveMQMessage;
-    class ActiveMQMessageListener;
+    class ActiveMQConsumer;
 
     /**
      * Transaction Management class, hold messages that are to be redelivered
@@ -67,7 +67,7 @@
 
         // Mapping of MessageListener Ids to Lists of Messages that are
         // redelivered on a Rollback
-        typedef std::map< ActiveMQMessageListener*, MessageList > RollbackMap;
+        typedef std::map< ActiveMQConsumer*, MessageList > RollbackMap;
 
     private:
 
@@ -106,7 +106,7 @@
          * @param session - the session that contains this transaction
          * @param properties - configuratoin parameters for this object
          */
-    	ActiveMQTransaction( ActiveMQConnection* connection,
+        ActiveMQTransaction( ActiveMQConnection* connection,
                              ActiveMQSession* session,
                              const util::Properties& properties );
 
@@ -116,18 +116,18 @@
          * Adds the Message as a part of the Transaction for the specified
          * ActiveMQConsumer.
          * @param message - Message to Transact
-         * @param listener - Listener to redeliver to on Rollback
+         * @param consumer - Listener to redeliver to on Rollback
          */
         virtual void addToTransaction( ActiveMQMessage* message,
-                                       ActiveMQMessageListener* listener );
+                                       ActiveMQConsumer* consumer );
 
         /**
-         * Removes the ActiveMQMessageListener and all of its transacted
+         * Removes the ActiveMQConsumer and all of its transacted
          * messages from the Transaction, this is usually only done when
-         * a ActiveMQMessageListener is destroyed.
+         * a ActiveMQConsumer is destroyed.
          * @param listener - consumer who is to be removed.
          */
-        virtual void removeFromTransaction( ActiveMQMessageListener* listener );
+        virtual void removeFromTransaction( ActiveMQConsumer* listener );
 
         /**
          * Commit the current Transaction
@@ -245,7 +245,7 @@
             MessageList messages;
 
             // Consumer we are redelivering to
-            ActiveMQMessageListener* listener;
+            ActiveMQConsumer* consumer;
 
             // Connection to use for sending message acks
             ActiveMQConnection* connection;
@@ -255,7 +255,7 @@
 
         public:
 
-            RollbackTask( ActiveMQMessageListener* listener,
+            RollbackTask( ActiveMQConsumer* consumer,
                           ActiveMQConnection* connection,
                           ActiveMQSession* session,
                           MessageList& messages,
@@ -264,7 +264,7 @@
 
                 // Store State Data.
                 this->messages        = messages;
-                this->listener        = listener;
+                this->consumer        = consumer;
                 this->redeliveryDelay = redeliveryDelay;
                 this->maxRedeliveries = maxRedeliveries;
                 this->session         = session;



Mime
View raw message