activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r688552 - in /activemq/activemq-cpp/trunk/src/main/activemq: connector/ connector/openwire/ connector/stomp/ core/
Date Sun, 24 Aug 2008 19:56:42 GMT
Author: tabish
Date: Sun Aug 24 12:56:42 2008
New Revision: 688552

URL: http://svn.apache.org/viewvc?rev=688552&view=rev
Log:
https://issues.apache.org/activemq/browse/AMQCPP-195

Changed the way messages are handled in transacted sessions, now they are all ack'd even if
the commit is done in the onMessage callback.  Also improved transaction support to not ack
ever message until the comit is done so that one large ack can be done in openwire reducing
network overhead.

Modified:
    activemq/activemq-cpp/trunk/src/main/activemq/connector/Connector.h
    activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireConnector.cpp
    activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireConnector.h
    activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/StompConnector.cpp
    activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/StompConnector.h
    activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConsumer.cpp
    activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConsumer.h
    activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQSession.cpp
    activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQSession.h
    activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQTransaction.cpp
    activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQTransaction.h

Modified: activemq/activemq-cpp/trunk/src/main/activemq/connector/Connector.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/connector/Connector.h?rev=688552&r1=688551&r2=688552&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/connector/Connector.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/connector/Connector.h Sun Aug 24 12:56:42
2008
@@ -252,6 +252,21 @@
             throw ( ConnectorException ) = 0;
 
         /**
+         * Acknowledges a Message set, using the most efficient means possible
+         * for the type of connector being used,
+         * @param session the Session that the message is linked to
+         * @param consumer the Consumer that the message was linked to
+         * @param messages A set of ActiveMQMessages to Ack.
+         * @param ackType the type of ack to perform
+         * @throws ConnectorException
+         */
+        virtual void acknowledge( const SessionInfo* session,
+                                  const ConsumerInfo* consumer,
+                                  const std::list<const cms::Message*>& message,
+                                  AckType ackType = ACK_TYPE_CONSUMED)
+            throw ( ConnectorException ) = 0;
+
+        /**
          * Starts a new Transaction.
          * @param Session Information
          * @throws ConnectorException

Modified: activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireConnector.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireConnector.cpp?rev=688552&r1=688551&r2=688552&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireConnector.cpp
(original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireConnector.cpp
Sun Aug 24 12:56:42 2008
@@ -987,6 +987,94 @@
 }
 
 ////////////////////////////////////////////////////////////////////////////////
+void OpenWireConnector::acknowledge( const SessionInfo* session,
+                                     const ConsumerInfo* consumer,
+                                     const std::list<const cms::Message*>& messages,
+                                     AckType ackType )
+    throw ( ConnectorException ) {
+
+    enforceConnected();
+
+    if( messages.empty() ) {
+        return;
+    }
+
+    try{
+
+        const commands::Message* amqMessage =
+            dynamic_cast<const commands::Message*>( *messages.rbegin() );
+
+        if( amqMessage == NULL ) {
+            throw OpenWireConnectorException(
+                __FILE__, __LINE__,
+                "OpenWireConnector::acknowledge - "
+                "Message was not a commands::Message derivation.");
+        }
+
+        const OpenWireConsumerInfo* consumerInfo =
+            dynamic_cast<const OpenWireConsumerInfo*>( consumer );
+
+        if( consumerInfo == NULL ) {
+            throw OpenWireConnectorException(
+                __FILE__, __LINE__,
+                "OpenWireConnector::acknowledge - "
+                "Consumer was not of the OpenWire flavor.");
+        }
+
+        commands::MessageAck ack;
+        ack.setAckType( (int)ackType );
+        ack.setConsumerId(
+            consumerInfo->getConsumerInfo()->getConsumerId()->cloneDataStructure()
);
+        ack.setDestination( amqMessage->getDestination()->cloneDataStructure() );
+        ack.setLastMessageId( amqMessage->getMessageId()->cloneDataStructure() );
+        ack.setMessageCount( messages.size() );
+
+        if( session->getAckMode() == cms::Session::SESSION_TRANSACTED ) {
+
+            const OpenWireTransactionInfo* transactionInfo =
+                dynamic_cast<const OpenWireTransactionInfo*>(
+                    session->getTransactionInfo() );
+
+            if( transactionInfo == NULL ||
+                transactionInfo->getTransactionInfo() == NULL ||
+                transactionInfo->getTransactionInfo()->getTransactionId() == NULL )
{
+                throw OpenWireConnectorException(
+                    __FILE__, __LINE__,
+                    "OpenWireConnector::acknowledge - "
+                    "Transacted Session, has no Transaction Info.");
+            }
+
+            const commands::TransactionId* transactionId =
+                dynamic_cast<const commands::TransactionId*>(
+                    transactionInfo->getTransactionInfo()->getTransactionId() );
+
+            commands::TransactionId* clonedTransactionId =
+                transactionId->cloneDataStructure();
+
+            ack.setTransactionId( clonedTransactionId );
+        }
+
+        oneway( &ack );
+
+    } catch( ConnectorException& ex ){
+        try{ transport->close(); } catch( ... ){}
+
+        ex.setMark(__FILE__,__LINE__);
+        throw ex;
+    } catch( Exception& ex ){
+        try{ transport->close(); } catch( ... ){}
+
+        ex.setMark(__FILE__,__LINE__);
+        throw OpenWireConnectorException( ex );
+    } catch( ... ) {
+        try{ transport->close(); } catch( ... ){}
+
+        throw OpenWireConnectorException( __FILE__, __LINE__,
+            "Caught unknown exception" );
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
 TransactionInfo* OpenWireConnector::startTransaction(
     connector::SessionInfo* session )
         throw ( ConnectorException ) {
@@ -1360,10 +1448,11 @@
             delete command;
 
         } else if( typeid( *command ) == typeid( commands::ProducerAck ) ) {
-            commands::ProducerAck* producerAck =
-                dynamic_cast<commands::ProducerAck*>( command );
 
             // TODO - Apply The Ack.
+            //commands::ProducerAck* producerAck =
+            //    dynamic_cast<commands::ProducerAck*>( command );
+
             delete command;
 
         } else if( typeid( *command ) == typeid( commands::WireFormatInfo ) ) {

Modified: activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireConnector.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireConnector.h?rev=688552&r1=688551&r2=688552&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireConnector.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/connector/openwire/OpenWireConnector.h Sun
Aug 24 12:56:42 2008
@@ -452,6 +452,21 @@
             throw ( ConnectorException );
 
         /**
+         * Acknowledges a Message set, using the most efficient means possible
+         * for the type of connector being used,
+         * @param session the Session that the message is linked to
+         * @param consumer the Consumer that the message was linked to
+         * @param messages A set of ActiveMQMessages to Ack.
+         * @param ackType the type of ack to perform
+         * @throws ConnectorException
+         */
+        virtual void acknowledge( const SessionInfo* session,
+                                  const ConsumerInfo* consumer,
+                                  const std::list<const cms::Message*>& message,
+                                  AckType ackType = ACK_TYPE_CONSUMED)
+            throw ( ConnectorException );
+
+        /**
          * Starts a new Transaction.
          * @param Session Information
          * @throws ConnectorException

Modified: activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/StompConnector.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/StompConnector.cpp?rev=688552&r1=688551&r2=688552&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/StompConnector.cpp (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/StompConnector.cpp Sun Aug
24 12:56:42 2008
@@ -524,6 +524,28 @@
 }
 
 ////////////////////////////////////////////////////////////////////////////////
+void StompConnector::acknowledge( const SessionInfo* session,
+                                  const ConsumerInfo* consumer,
+                                  const std::list<const cms::Message*>& messages,
+                                  AckType ackType )
+    throw ( ConnectorException ) {
+
+    try {
+
+        enforceConnected();
+
+        std::list<const cms::Message*>::const_iterator iter = messages.begin();
+
+        for( ; iter != messages.end(); ++iter ) {
+            this->acknowledge( session, consumer, *iter, ackType );
+        }
+    }
+    AMQ_CATCH_RETHROW( ConnectorException )
+    AMQ_CATCH_EXCEPTION_CONVERT( Exception, ConnectorException )
+    AMQ_CATCHALL_THROW( ConnectorException );
+}
+
+////////////////////////////////////////////////////////////////////////////////
 TransactionInfo* StompConnector::startTransaction(
     SessionInfo* session )
         throw ( ConnectorException ) {

Modified: activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/StompConnector.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/StompConnector.h?rev=688552&r1=688551&r2=688552&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/StompConnector.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/connector/stomp/StompConnector.h Sun Aug
24 12:56:42 2008
@@ -368,6 +368,21 @@
             throw ( ConnectorException );
 
         /**
+         * Acknowledges a Message set, using the most efficient means possible
+         * for the type of connector being used,
+         * @param session the Session that the message is linked to
+         * @param consumer the Consumer that the message was linked to
+         * @param messages A set of ActiveMQMessages to Ack.
+         * @param ackType the type of ack to perform
+         * @throws ConnectorException
+         */
+        virtual void acknowledge( const SessionInfo* session,
+                                  const ConsumerInfo* consumer,
+                                  const std::list<const cms::Message*>& message,
+                                  AckType ackType = ACK_TYPE_CONSUMED)
+            throw ( ConnectorException );
+
+        /**
          * Starts a new Transaction.
          * @param session Session Information
          * @throws ConnectorException

Modified: activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConsumer.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConsumer.cpp?rev=688552&r1=688551&r2=688552&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConsumer.cpp (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConsumer.cpp Sun Aug 24 12:56:42
2008
@@ -22,7 +22,9 @@
 #include <decaf/util/Date.h>
 #include <activemq/util/Config.h>
 #include <activemq/exceptions/ActiveMQException.h>
+#include <activemq/core/ActiveMQConnection.h>
 #include <activemq/core/ActiveMQSession.h>
+#include <activemq/core/ActiveMQTransaction.h>
 #include <activemq/core/ActiveMQMessage.h>
 #include <cms/ExceptionListener.h>
 
@@ -39,7 +41,8 @@
 
 ////////////////////////////////////////////////////////////////////////////////
 ActiveMQConsumer::ActiveMQConsumer( connector::ConsumerInfo* consumerInfo,
-                                    ActiveMQSession* session ) {
+                                    ActiveMQSession* session,
+                                    ActiveMQTransaction* transaction ) {
 
     if( session == NULL || consumerInfo == NULL ) {
         throw ActiveMQException(
@@ -49,6 +52,7 @@
 
     // Init Producer Data
     this->session = session;
+    this->transaction = transaction;
     this->consumerInfo = consumerInfo;
     this->listener = NULL;
     this->closed = false;
@@ -107,11 +111,10 @@
                     haveException = true;
                 }
             }
-            
+
             // Wakeup any synchronous consumers.
-            synchronized( &unconsumedMessages )
-            {
-                unconsumedMessages.notifyAll(); 
+            synchronized( &unconsumedMessages ) {
+                unconsumedMessages.notifyAll();
             }
 
             // If we encountered an error, propagate it.
@@ -200,10 +203,8 @@
 
                 // Return the message.
                 return message;
-
-            } // while( true )
-
-        } // synchronized( &unconsumedMessages )
+            }
+        }
 
         return NULL;
     }
@@ -233,14 +234,14 @@
         }
 
         // Message preprocessing
-        beforeMessageIsConsumed(msg);
+        beforeMessageIsConsumed( msg );
 
         // Need to clone the message because the user is responsible for freeing
         // its copy of the message.
         cms::Message* clonedMsg = dynamic_cast<cms::Message*>(msg)->clone();
 
         // Post processing (may result in the message being deleted)
-        afterMessageIsConsumed(msg, false);
+        afterMessageIsConsumed( msg, false );
 
         // Return the cloned message.
         return clonedMsg;
@@ -272,14 +273,14 @@
         }
 
         // Message preprocessing
-        beforeMessageIsConsumed(msg);
+        beforeMessageIsConsumed( msg );
 
         // Need to clone the message because the user is responsible for freeing
         // its copy of the message.
         cms::Message* clonedMsg = dynamic_cast<cms::Message*>(msg)->clone();
 
         // Post processing (may result in the message being deleted)
-        afterMessageIsConsumed(msg, false);
+        afterMessageIsConsumed( msg, false );
 
         // Return the cloned message.
         return clonedMsg;
@@ -311,14 +312,14 @@
         }
 
         // Message preprocessing
-        beforeMessageIsConsumed(msg);
+        beforeMessageIsConsumed( msg );
 
         // Need to clone the message because the user is responsible for freeing
         // its copy of the message.
         cms::Message* clonedMsg = dynamic_cast<cms::Message*>(msg)->clone();
 
         // Post processing (may result in the message being deleted)
-        afterMessageIsConsumed(msg, false);
+        afterMessageIsConsumed( msg, false );
 
         // Return the cloned message.
         return clonedMsg;
@@ -375,23 +376,40 @@
         // acknowledge method.
         message->setAckHandler( this );
     }
+
+    // If the session is transacted then we hand off the message to it to
+    // be stored for later redelivery.  We do need to check and see if we
+    // are approaching the prefetch limit and send an Delivered ack just so
+    // we continue to receive messages, otherwise we'd stall.
+    if( session->isTransacted() ) {
+
+        if( transaction == NULL ) {
+            throw NullPointerException(
+                __FILE__, __LINE__,
+                "In a Transacted Session but no Transaction Context set." );
+        }
+
+        // Store the message in the transaction, we clone the message into the
+        // transaction so that there is a copy to commit if commit is called in
+        // the async onMessage method and also so we know that our copy can
+        // be deleted.
+        transaction->addToTransaction(
+            dynamic_cast<cms::Message*>( message )->clone(), this );
+    }
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 void ActiveMQConsumer::afterMessageIsConsumed( ActiveMQMessage* message,
-    bool messageExpired AMQCPP_UNUSED ) {
+                                               bool messageExpired AMQCPP_UNUSED ) {
 
     try{
 
-        if( !session->isClientAcknowledge() ) {
-            session->acknowledge( this, message );
+        if( session->isAutoAcknowledge() || messageExpired ) {
+            this->acknowledge( message, Connector::ACK_TYPE_CONSUMED );
         }
 
-        // The Message is cleaned up here if the Session is not
-        // transacted, otherwise we let the transaction clean up
-        // this message as it will have already been ack'd and
-        // stored for later redelivery.
-        destroyMessage( message );
+        // The Message is cleaned up here.
+        delete message;
     }
     AMQ_CATCH_RETHROW( ActiveMQException )
     AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
@@ -410,10 +428,34 @@
                 "ActiveMQConsumer::receive - This Consumer is closed" );
         }
 
-        // Delegate the Ack to the Session, we cast away copnstness since
-        // in a transactional session we might need to redeliver this
-        // message and update its data.
-        session->acknowledge( this, const_cast<ActiveMQMessage*>(message) );
+        // Send an ack indicating that the client has consumed the message
+        this->acknowledge( message, Connector::ACK_TYPE_CONSUMED );
+    }
+    AMQ_CATCH_RETHROW( ActiveMQException )
+    AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
+    AMQ_CATCHALL_THROW( ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConsumer::acknowledge( const ActiveMQMessage* message, int ackType )
+    throw ( cms::CMSException ) {
+
+    try{
+
+        if( closed ) {
+            throw ActiveMQException(
+                __FILE__, __LINE__,
+                "ActiveMQConsumer::receive - This Consumer is closed" );
+        }
+
+        // Delegate the Ack to the Session.
+        // Delegate to connector to ack this message.
+        session->getConnection()->getConnectionData()->
+            getConnector()->acknowledge(
+                session->getSessionInfo(),
+                this->getConsumerInfo(),
+                dynamic_cast<const cms::Message*>( message ),
+                (Connector::AckType)ackType );
     }
     AMQ_CATCH_RETHROW( ActiveMQException )
     AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
@@ -429,7 +471,7 @@
 
         // Don't dispatch expired messages, ack it and then destroy it
         if( message->isExpired() ) {
-            session->acknowledge( this, message );
+            this->acknowledge( message, Connector::ACK_TYPE_CONSUMED );
             delete message;
 
             // stop now, don't queue
@@ -471,10 +513,7 @@
         synchronized( &unconsumedMessages ) {
 
             while( !unconsumedMessages.empty() ) {
-                // destroy these messages if this is not a transacted
-                // session, if it is then the tranasction will clean
-                // the messages up.
-                destroyMessage( unconsumedMessages.pop().getMessage() );
+                delete unconsumedMessages.pop().getMessage();
             }
         }
     }
@@ -502,24 +541,6 @@
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConsumer::destroyMessage( ActiveMQMessage* message )
-    throw ( ActiveMQException ) {
-
-    try {
-        /**
-         * Only destroy the message if the session is NOT transacted.  If
-         * it is, the session will take care of it.
-         */
-        if( message != NULL && !session->isTransacted() ) {
-            delete message;
-        }
-    }
-    AMQ_CATCH_RETHROW( ActiveMQException )
-    AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
-    AMQ_CATCHALL_THROW( ActiveMQException )
-}
-
-////////////////////////////////////////////////////////////////////////////////
 void ActiveMQConsumer::onConnectorResourceClosed(
     const ConnectorResource* resource ) throw ( cms::CMSException ) {
 

Modified: activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConsumer.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConsumer.h?rev=688552&r1=688551&r2=688552&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConsumer.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQConsumer.h Sun Aug 24 12:56:42
2008
@@ -36,6 +36,7 @@
 namespace core{
 
     class ActiveMQSession;
+    class ActiveMQTransaction;
 
     class AMQCPP_API ActiveMQConsumer :
         public cms::MessageConsumer,
@@ -51,6 +52,11 @@
         ActiveMQSession* session;
 
         /**
+         * The Transaction Context, null if not in a Transacted Session.
+         */
+        ActiveMQTransaction* transaction;
+
+        /**
          * The Consumer info for this Consumer
          */
         connector::ConsumerInfo* consumerInfo;
@@ -76,7 +82,8 @@
          * Constructor
          */
         ActiveMQConsumer( connector::ConsumerInfo* consumerInfo,
-                          ActiveMQSession* session );
+                          ActiveMQSession* session,
+                          ActiveMQTransaction* transaction );
 
         virtual ~ActiveMQConsumer();
 
@@ -138,8 +145,9 @@
             throw ( cms::CMSException );
 
         /**
-         * Method called to acknowledge the message passed
-         * @param message the Message to Acknowlegde
+         * Method called to acknowledge the message passed, called from a message
+         * when the mode is client ack.
+         * @param message the Message to Acknowledge
          * @throw CMSException
          */
         virtual void acknowledgeMessage( const ActiveMQMessage* message )
@@ -156,6 +164,17 @@
     public:  // ActiveMQConsumer Methods
 
         /**
+         * Method called to acknowledge the message passed, ack it using
+         * the passed in ackType, see <code>Connector</code> for a list
+         * of the correct ack types.
+         * @param message the Message to Acknowledge
+         * @param ackType the Type of ack to send, (connector enum)
+         * @throw CMSException
+         */
+        virtual void acknowledge( const ActiveMQMessage* message, int ackType )
+            throw ( cms::CMSException );
+
+        /**
          * Get the Consumer information for this consumer
          * @return Pointer to a Consumer Info Object
          */
@@ -195,14 +214,6 @@
         virtual void purgeMessages() throw (exceptions::ActiveMQException);
 
         /**
-         * Destroys the message if the session is transacted, otherwise
-         * does nothing.
-         * @param message the message to destroy
-         */
-        virtual void destroyMessage( ActiveMQMessage* message )
-            throw (exceptions::ActiveMQException);
-
-        /**
          * Used by synchronous receive methods to wait for messages to come in.
          * @param timeout - The maximum number of milliseconds to wait before
          * returning.

Modified: activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQSession.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQSession.cpp?rev=688552&r1=688551&r2=688552&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQSession.cpp (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQSession.cpp Sun Aug 24 12:56:42
2008
@@ -57,7 +57,7 @@
     this->connection = connection;
     this->closed = false;
 
-    // Create a Transaction object only if the session is transactional
+    // Create a Transaction object only if the session is transacted
     if( isTransacted() ) {
         transaction =
             new ActiveMQTransaction(connection, this, properties );
@@ -87,7 +87,7 @@
 ////////////////////////////////////////////////////////////////////////////////
 void ActiveMQSession::close() throw ( cms::CMSException )
 {
-    // If we're already close, just get outta' here.
+    // If we're already closed, just return.
     if( closed ) {
         return;
     }
@@ -241,8 +241,7 @@
 
         // Create the consumer instance.
         ActiveMQConsumer* consumer = new ActiveMQConsumer(
-            consumerInfo, this );
-
+            consumerInfo, this, this->transaction );
 
         // Add the consumer to the map.
         synchronized( &consumers ) {
@@ -299,7 +298,7 @@
 
         // Create the consumer instance.
         ActiveMQConsumer* consumer = new ActiveMQConsumer(
-            consumerInfo, this );
+            consumerInfo, this, this->transaction );
 
         // Add the consumer to the map.
         synchronized( &consumers ) {
@@ -621,37 +620,6 @@
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ActiveMQSession::acknowledge( ActiveMQConsumer* consumer,
-                                   ActiveMQMessage* message )
-    throw ( cms::CMSException ) {
-
-    try{
-
-        if( closed ) {
-            throw ActiveMQException(
-                __FILE__, __LINE__,
-                "ActiveMQSession::acknowledgeMessage - Session Already Closed" );
-        }
-
-        // Stores the Message and its consumer in the transaction, if the
-        // session is a transactional one.
-        if( isTransacted() ) {
-            transaction->addToTransaction( message, consumer );
-        }
-
-        // Delegate to connector to ack this message.
-        return connection->getConnectionData()->
-            getConnector()->acknowledge(
-                sessionInfo,
-                consumer->getConsumerInfo(),
-                dynamic_cast< cms::Message* >( message ) );
-    }
-    AMQ_CATCH_RETHROW( ActiveMQException )
-    AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
-    AMQ_CATCHALL_THROW( ActiveMQException )
-}
-
-////////////////////////////////////////////////////////////////////////////////
 void ActiveMQSession::send( cms::Message* message, ActiveMQProducer* producer )
     throw ( cms::CMSException ) {
 
@@ -663,7 +631,7 @@
                 "ActiveMQSession::onProducerClose - Session Already Closed" );
         }
 
-        // Send via the connection syncrhronously.
+        // Send via the connection synchronously.
         connection->getConnectionData()->
             getConnector()->send( message, producer->getProducerInfo() );
     }
@@ -698,11 +666,9 @@
             // Remove the dispatcher for the Connection
             connection->removeDispatcher( consumer );
 
-            // Remove this consumer from the Transaction if we are
-            // transactional
+            // Remove this consumer from the Transaction if we are transacted
             if( transaction != NULL ) {
-                transaction->removeFromTransaction(
-                    consumer->getConsumerId() );
+                transaction->removeFromTransaction( consumer->getConsumerId() );
             }
 
             ActiveMQConsumer* obj = NULL;

Modified: activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQSession.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQSession.h?rev=688552&r1=688551&r2=688552&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQSession.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQSession.h Sun Aug 24 12:56:42
2008
@@ -343,16 +343,6 @@
             throw ( cms::CMSException );
 
         /**
-         * Called to acknowledge the receipt of a message.
-         * @param The consumer that received the message
-         * @param The Message to acknowledge.
-         * @throws CMSException
-         */
-        virtual void acknowledge( ActiveMQConsumer* consumer,
-                                  ActiveMQMessage* message )
-            throw ( cms::CMSException );
-
-        /**
          * This method gets any registered exception listener of this sessions
          * connection and returns it.  Mainly intended for use by the objects
          * that this session creates so that they can notify the client of
@@ -367,7 +357,14 @@
          * @return SessionInfo Pointer
          */
         virtual connector::SessionInfo* getSessionInfo() {
-            return sessionInfo;
+            return this->sessionInfo;
+        }
+
+        /**
+         * Gets the ActiveMQConnection that is associated with this session.
+         */
+        ActiveMQConnection* getConnection() const {
+            return this->connection;
         }
 
         /**

Modified: activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQTransaction.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQTransaction.cpp?rev=688552&r1=688551&r2=688552&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQTransaction.cpp (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQTransaction.cpp Sun Aug 24
12:56:42 2008
@@ -127,6 +127,24 @@
 }
 
 ////////////////////////////////////////////////////////////////////////////////
+void ActiveMQTransaction::addToTransaction( cms::Message* message,
+                                            ActiveMQConsumer* consumer ) {
+
+    synchronized( &rollbackLock ) {
+
+        ActiveMQMessage* coreMessage = dynamic_cast<ActiveMQMessage*>( message );
+
+        if( coreMessage == NULL ) {
+            throw NullPointerException(
+                __FILE__, __LINE__, "Message is not a core::ActiveMQMessage derivation" );
+        }
+
+        // Store in the Multi Map
+        rollbackMap[consumer].push_back( coreMessage );
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
 void ActiveMQTransaction::removeFromTransaction( ActiveMQConsumer* consumer ) {
 
     try{
@@ -183,7 +201,7 @@
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ActiveMQTransaction::commit() 
+void ActiveMQTransaction::commit()
     throw ( activemq::exceptions::ActiveMQException ) {
 
     try{
@@ -198,6 +216,17 @@
         // Stop any deliveries
         session->stop();
 
+        // Now that the session is stopped, ack all messages we've
+        // delivered to the clients and placed in the Rollback map.
+        synchronized( &rollbackLock ) {
+
+            RollbackMap::iterator itr = rollbackMap.begin();
+
+            for(; itr != rollbackMap.end(); ++itr) {
+                ackMessages( itr->first, itr->second );
+            }
+        }
+
         // Commit the current Transaction
         connection->getConnectionData()->getConnector()->
             commit( transactionInfo, session->getSessionInfo() );
@@ -218,7 +247,7 @@
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ActiveMQTransaction::rollback() 
+void ActiveMQTransaction::rollback()
     throw ( activemq::exceptions::ActiveMQException ) {
 
     try{
@@ -249,8 +278,8 @@
         session->start();
 
         // Roolback the messages to the Session, since we have the lock on the
-        // rollbackLock, then no message will added to the transaction unitll we
-        // are done processing all the messages that we to redeliver and the map
+        // rollbackLock, then no message will added to the transaction until we
+        // are done processing all the messages that we to re-deliver and the map
         // is cleared.
         synchronized( &rollbackLock ) {
 
@@ -284,20 +313,20 @@
             ActiveMQMessage* message = *itr;
             message->setRedeliveryCount( message->getRedeliveryCount() + 1 );
 
-            if( message->getRedeliveryCount() >= maxRedeliveries )
-            {
+            if( message->getRedeliveryCount() >= maxRedeliveries ) {
+
                 // Poison Ack the Message, we give up processing this one
                 connection->getConnectionData()->getConnector()->
                     acknowledge(
                         session->getSessionInfo(),
                         consumer->getConsumerInfo(),
-                        dynamic_cast< Message* >( message ),
+                        dynamic_cast<Message*>( message ),
                         Connector::ACK_TYPE_POISON );
 
-                // Won't redeliver this so we kill it here.
+                // Won't be re-delivered so it must be destroyed here.
                 delete message;
 
-                return;
+                continue;
             }
 
             DispatchData data( consumer->getConsumerInfo(), message );
@@ -308,3 +337,32 @@
     AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
     AMQ_CATCHALL_THROW( ActiveMQException )
 }
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQTransaction::ackMessages( ActiveMQConsumer* consumer,
+                                       MessageList& messages )
+    throw ( activemq::exceptions::ActiveMQException ) {
+
+    try {
+
+        std::list<const cms::Message*> cmsMessages;
+
+        MessageList::iterator iter = messages.begin();
+        for( ; iter != messages.end(); ++iter ) {
+            cmsMessages.insert( cmsMessages.end(),
+                dynamic_cast<const cms::Message*>( *iter ) );
+        }
+
+        // Acknowledge the Messages let the connector do it in the most
+        // efficient manner it can for large message block acks.
+        connection->getConnectionData()->getConnector()->
+            acknowledge(
+                session->getSessionInfo(),
+                consumer->getConsumerInfo(),
+                cmsMessages,
+                Connector::ACK_TYPE_CONSUMED );
+    }
+    AMQ_CATCH_RETHROW( ActiveMQException )
+    AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
+    AMQ_CATCHALL_THROW( ActiveMQException )
+}

Modified: activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQTransaction.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQTransaction.h?rev=688552&r1=688551&r2=688552&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQTransaction.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/core/ActiveMQTransaction.h Sun Aug 24 12:56:42
2008
@@ -41,26 +41,26 @@
 
     /**
      * Transaction Management class, hold messages that are to be redelivered
-     * upon a request to rollback.  The Tranasction represents an always
+     * upon a request to roll-back.  The Transaction represents an always
      * running transaction, when it is committed or rolled back it silently
      * creates a new transaction for the next set of messages.  The only
-     * way to permanently end this tranaction is to delete it.
+     * way to permanently end this transaction is to delete it.
      *
      * Configuration options
      *
      * transaction.maxRedeliveryCount
-     *   Max number of times a message can be redelivered, if the session is
+     *   Max number of times a message can be re-delivered, if the session is
      *   rolled back more than this many time, the message is dropped.
      */
     class AMQCPP_API ActiveMQTransaction : public connector::TransactionInfo {
     private:
 
         // List type for holding messages
-        typedef std::list< ActiveMQMessage* > MessageList;
+        typedef std::list<ActiveMQMessage*> MessageList;
 
-        // Mapping of MessageListener Ids to Lists of Messages that are
-        // redelivered on a Rollback
-        typedef std::map< ActiveMQConsumer*, MessageList > RollbackMap;
+        // Mapping of MessageListener Id's to Lists of Messages that are
+        // re-delivered on a Rollback
+        typedef std::map<ActiveMQConsumer*, MessageList> RollbackMap;
 
     private:
 
@@ -79,19 +79,16 @@
         // Lock object to protect the rollback Map
         decaf::util::concurrent::Mutex rollbackLock;
 
-        // Max number of redeliveries before we quit
+        // Max number of re-deliveries before we quit
         int maxRedeliveries;
 
-        // Mutex that is signaled when all tasks complete.
-        decaf::util::concurrent::Mutex tasksDone;
-
     public:
 
         /**
          * Constructor
          * @param connection - Connection to the Broker
          * @param session - the session that contains this transaction
-         * @param properties - configuratoin parameters for this object
+         * @param properties - configuration parameters for this object
          */
         ActiveMQTransaction( ActiveMQConnection* connection,
                              ActiveMQSession* session,
@@ -109,6 +106,15 @@
                                        ActiveMQConsumer* consumer );
 
         /**
+         * Adds the Message as a part of the Transaction for the specified
+         * ActiveMQConsumer.
+         * @param message - Message to Transact
+         * @param consumer - Listener to redeliver to on Rollback
+         */
+        virtual void addToTransaction( cms::Message* message,
+                                       ActiveMQConsumer* consumer );
+
+        /**
          * Removes the ActiveMQConsumer and all of its transacted
          * messages from the Transaction, this is usually only done when
          * an ActiveMQConsumer is destroyed.
@@ -148,7 +154,7 @@
     public:   // TransactionInfo Interface
 
         /**
-         * Gets the Transction Id
+         * Gets the Transaction Id
          * @return integral value of Id
          */
         virtual long long getTransactionId() const {
@@ -156,7 +162,7 @@
         }
 
         /**
-         * Sets the Transction Id
+         * Sets the Transaction Id
          * @param id - integral value of Id
          */
         virtual void setTransactionId( long long id ) {
@@ -190,16 +196,28 @@
         virtual void clearTransaction();
 
         /**
-         * Redelivers each message that is in the Message List to the specified
-         * consumer, throwing messages away as they hit their max redilviery
+         * Re-delivers each message that is in the Message List to the specified
+         * consumer, throwing messages away as they hit their max re-delivery
          * count.
-         * @param consumer - the ActiveMQConsumer to redeliver to
+         * @param consumer - the ActiveMQConsumer to re-deliver to
          * @param messages - the list of messages that should be sent.
          * @throws ActiveMQException if an error occurs.
          */
         virtual void redeliverMessages( ActiveMQConsumer* consumer,
                                         MessageList& messages )
-                                            throw ( exceptions::ActiveMQException );
+            throw ( exceptions::ActiveMQException );
+
+        /**
+         * Acknowledges each message that is in the Message List to the specified
+         * consumer.
+         * @param consumer - the ActiveMQConsumer to acknowledge to
+         * @param messages - the list of messages that should be sent.
+         * @throws ActiveMQException if an error occurs.
+         */
+        virtual void ackMessages( ActiveMQConsumer* consumer,
+                                  MessageList& messages )
+            throw ( exceptions::ActiveMQException );
+
     };
 
 }}



Mime
View raw message