activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nmitt...@apache.org
Subject svn commit: r524628 - in /activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq: connector/openwire/OpenWireConnector.cpp core/ActiveMQConsumer.cpp core/ActiveMQConsumer.h core/ActiveMQSession.h
Date Sun, 01 Apr 2007 16:43:27 GMT
Author: nmittler
Date: Sun Apr  1 09:43:26 2007
New Revision: 524628

URL: http://svn.apache.org/viewvc?view=rev&rev=524628
Log:
AMQCPP-83 - updates to fix openwire durable test

Modified:
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireConnector.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.h

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireConnector.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireConnector.cpp?view=diff&rev=524628&r1=524627&r2=524628
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireConnector.cpp
(original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireConnector.cpp
Sun Apr  1 09:43:26 2007
@@ -960,7 +960,7 @@
 void OpenWireConnector::acknowledge( const SessionInfo* session,
                                      const ConsumerInfo* consumer,
                                      const cms::Message* message,
-                                     AckType ackType = ConsumedAck )
+                                     AckType ackType )
     throw ( ConnectorException )
 {
     try {

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp?view=diff&rev=524628&r1=524627&r2=524628
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp Sun
Apr  1 09:43:26 2007
@@ -21,12 +21,14 @@
 #include <activemq/exceptions/IllegalArgumentException.h>
 #include <activemq/core/ActiveMQSession.h>
 #include <activemq/core/ActiveMQMessage.h>
+#include <activemq/util/Date.h>
 #include <cms/ExceptionListener.h>
 
 using namespace std;
 using namespace cms;
 using namespace activemq;
 using namespace activemq::core;
+using namespace activemq::util;
 using namespace activemq::connector;
 using namespace activemq::exceptions;
 using namespace activemq::concurrent;
@@ -130,8 +132,9 @@
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-cms::Message* ActiveMQConsumer::receive() throw ( cms::CMSException )
-{
+ActiveMQMessage* ActiveMQConsumer::dequeue(int timeout) 
+    throw ( cms::CMSException ) 
+{        
     try
     {
         if( closed )
@@ -139,42 +142,98 @@
             throw InvalidStateException(
                 __FILE__, __LINE__,
                 "ActiveMQConsumer::receive - This Consumer is closed" );
-        }
+        }        
 
         synchronized( &unconsumedMessages )
         {
-            // Check for empty in case of spurious wakeup, or race to
-            // queue lock.
-            while( !closed && unconsumedMessages.empty() )
-            {
-                unconsumedMessages.wait();
-            }
-
-            // This will only happen when this object is being
-            // closed in another thread context - kind of
-            // scary.
-            if( closed ){
-                throw ActiveMQException(
-                    __FILE__, __LINE__,
-                    "Consumer is being closed in another thread" );
-            }
-
-            // Fetch the Message then copy it so it can be handed off
-            // to the user.
-            DispatchData data = unconsumedMessages.pop();
-            cms::Message* message = dynamic_cast<cms::Message*>(data.getMessage());
-            cms::Message* result = message->clone();
-
-            // The Message is cleaned up here if the Session is not
-            // transacted, otherwise we let the transaction clean up
-            // this message as it will have already been ack'd and
-            // stored for later redelivery.
-            destroyMessage( message );
+            // Calculate the deadline
+            long long deadline = 0;
+            if (timeout > 0) {
+                deadline = Date::getCurrentTimeMilliseconds() + timeout;
+            }
+                
+            // Loop until the time is up or we get a non-expired message
+            while( true ) {
+                
+                // Wait until either the deadline is met, a message arrives, or
+                // we've closed.
+                while( !closed && unconsumedMessages.empty() && timeout !=
0 )
+                {
+                    if( timeout < 0 ) {
+                        unconsumedMessages.wait();
+                    } else if( timeout > 0 ) {
+                        unconsumedMessages.wait(timeout);
+                        timeout = std::max((int)(deadline - Date::getCurrentTimeMilliseconds()),
0);
+                    }
+                }
+                
+                if( unconsumedMessages.empty() ) {
+                    return NULL;
+                }
+    
+                // Fetch the Message then copy it so it can be handed off
+                // to the user.
+                DispatchData data = unconsumedMessages.pop();
+                
+                // Get the message.
+                ActiveMQMessage* message = data.getMessage();
+                
+                // If it's expired, process the message and then go back to waiting.
+                if( message->isExpired() ) {
+                    
+                    beforeMessageIsConsumed(message);
+                    afterMessageIsConsumed(message, true);
+                    if (timeout > 0) {
+                        timeout = std::max((int)(deadline - Date::getCurrentTimeMilliseconds()),
0);
+                    }
+                    
+                    // Go back to waiting for a non-expired message.
+                    continue;                    
+                } 
+                
+                // Return the message.
+                return message;
+                
+            } // while( true )
+            
+        } // synchronized( &unconsumedMessages )
 
-            return result;
+        return NULL;
+    }
+    AMQ_CATCH_RETHROW( ActiveMQException )
+    AMQ_CATCHALL_THROW( ActiveMQException )
+}
+    
+////////////////////////////////////////////////////////////////////////////////
+cms::Message* ActiveMQConsumer::receive() throw ( cms::CMSException )
+{
+    try
+    {
+        if( closed )
+        {
+            throw InvalidStateException(
+                __FILE__, __LINE__,
+                "ActiveMQConsumer::receive - This Consumer is closed" );
         }
 
-        return NULL;
+        // Wait for the next message.
+        ActiveMQMessage* msg = dequeue( -1 );
+        if( msg == NULL ) {
+            return NULL;
+        }
+        
+        // Message preprocessing
+        beforeMessageIsConsumed(msg);
+        
+        // Need to clone the message because the user is responsible for freeing
+        // its copy of the message.
+        cms::Message* clonedMsg = dynamic_cast<cms::Message*>(msg)->clone();
+        
+        // Post processing (may result in the message being deleted)
+        afterMessageIsConsumed(msg, false);
+        
+        // Return the cloned message.
+        return clonedMsg;
     }
     AMQ_CATCH_RETHROW( ActiveMQException )
     AMQ_CATCHALL_THROW( ActiveMQException )
@@ -193,44 +252,24 @@
                 "ActiveMQConsumer::receive - This Consumer is closed" );
         }
 
-        synchronized( &unconsumedMessages )
-        {
-            // Check for empty, and wait if its not
-            if( !closed && unconsumedMessages.empty() ){
-
-                unconsumedMessages.wait( millisecs );
-
-                // if its still empty...bail
-                if( unconsumedMessages.empty() ) {
-                    return NULL;
-                }
-            }
-
-            // This will only happen when this object is being
-            // closed in another thread context - kind of
-            // scary.
-            if( closed ){
-                throw ActiveMQException(
-                    __FILE__, __LINE__,
-                    "Consumer is being closed in another thread" );
-            }
-
-            // Fetch the Message then copy it so it can be handed off
-            // to the user.
-            DispatchData data = unconsumedMessages.pop();
-            cms::Message* message = dynamic_cast<cms::Message*>(data.getMessage());
-            cms::Message* result = message->clone();
-
-            // The Message is cleaned up here if the Session is not
-            // transacted, otherwise we let the transaction clean up
-            // this message as it will have already been ack'd and
-            // stored for later redelivery.
-            destroyMessage( message );
-
-            return result;
-        }
-
-        return NULL;
+        // Wait for the next message.
+        ActiveMQMessage* msg = dequeue( millisecs );
+        if( msg == NULL ) {
+            return NULL;
+        }
+        
+        // Message preprocessing
+        beforeMessageIsConsumed(msg);
+        
+        // Need to clone the message because the user is responsible for freeing
+        // its copy of the message.
+        cms::Message* clonedMsg = dynamic_cast<cms::Message*>(msg)->clone();
+        
+        // Post processing (may result in the message being deleted)
+        afterMessageIsConsumed(msg, false);
+        
+        // Return the cloned message.
+        return clonedMsg;
     }
     AMQ_CATCH_RETHROW( ActiveMQException )
     AMQ_CATCHALL_THROW( ActiveMQException )
@@ -249,27 +288,24 @@
                 "ActiveMQConsumer::receive - This Consumer is closed" );
         }
 
-        synchronized( &unconsumedMessages )
-        {
-            if( !unconsumedMessages.empty() )
-            {
-                // Fetch the Message then copy it so it can be handed off
-                // to the user.
-                DispatchData data = unconsumedMessages.pop();
-                cms::Message* message = dynamic_cast<cms::Message*>(data.getMessage());
-                cms::Message* result = message->clone();
-
-                // The Message is cleaned up here if the Session is not
-                // transacted, otherwise we let the transaction clean up
-                // this message as it will have already been ack'd and
-                // stored for later redelivery.
-                destroyMessage( message );
-
-                return result;
-            }
-        }
-
-        return NULL;
+        // Get the next available message, if there is one.
+        ActiveMQMessage* msg = dequeue( 0 );
+        if( msg == NULL ) {
+            return NULL;
+        }
+        
+        // Message preprocessing
+        beforeMessageIsConsumed(msg);
+        
+        // Need to clone the message because the user is responsible for freeing
+        // its copy of the message.
+        cms::Message* clonedMsg = dynamic_cast<cms::Message*>(msg)->clone();
+        
+        // Post processing (may result in the message being deleted)
+        afterMessageIsConsumed(msg, false);
+        
+        // Return the cloned message.
+        return clonedMsg;
     }
     AMQ_CATCH_RETHROW( ActiveMQException )
     AMQ_CATCHALL_THROW( ActiveMQException )
@@ -311,6 +347,40 @@
 }
 
 ////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConsumer::beforeMessageIsConsumed( ActiveMQMessage* message ) {
+    
+    // If the Session is in ClientAcknowledge mode, then we set the
+    // handler in the message to this object and send it out.  Otherwise
+    // we ack it here for all the other Modes.
+    if( session->isClientAcknowledge() ) {
+
+        // Register ourself so that we can handle the Message's
+        // acknowledge method.
+        message->setAckHandler( this );
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConsumer::afterMessageIsConsumed( ActiveMQMessage* message, 
+    bool messageExpired )
+{
+    try
+    {
+        if( !session->isClientAcknowledge() ) {
+            session->acknowledge( this, message );
+        }
+        
+        // The Message is cleaned up here if the Session is not
+        // transacted, otherwise we let the transaction clean up
+        // this message as it will have already been ack'd and
+        // stored for later redelivery.
+        destroyMessage( message );
+    }
+    AMQ_CATCH_RETHROW( ActiveMQException )
+    AMQ_CATCHALL_THROW( ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
 void ActiveMQConsumer::acknowledgeMessage( const ActiveMQMessage* message )
    throw ( cms::CMSException )
 {
@@ -326,7 +396,7 @@
         // Delegate the Ack to the Session, we cast away copnstness since
         // in a transactional session we might need to redeliver this
         // message and update its data.
-        session->acknowledge( this, const_cast< ActiveMQMessage*>( message ) );
+        session->acknowledge( this, const_cast<ActiveMQMessage*>(message) );
     }
     AMQ_CATCH_RETHROW( ActiveMQException )
     AMQ_CATCHALL_THROW( ActiveMQException )
@@ -346,26 +416,21 @@
 
             // stop now, don't queue
             return;
-        }
-
-        // If the Session is in ClientAcknowledge mode, then we set the
-        // handler in the message to this object and send it out.  Otherwise
-        // we ack it here for all the other Modes.
-        if( session->getAcknowledgeMode() == Session::CLIENT_ACKNOWLEDGE ) {
-
-            // Register ourself so that we can handle the Message's
-            // acknowledge method.
-            message->setAckHandler( this );
-
-        } else {
-            session->acknowledge( this, message );
-        }
+        }        
 
         // If we have a listener, send the message.
         if( listener != NULL ) {
-            cms::Message* message = dynamic_cast<cms::Message*>(data.getMessage());
-            listener->onMessage( message );
-            destroyMessage( message );
+            ActiveMQMessage* message = data.getMessage();
+            
+            // Preprocessing.
+            beforeMessageIsConsumed( message );
+            
+            // Notify the listener
+            listener->onMessage( dynamic_cast<cms::Message*>(message) );
+            
+            // Postprocessing
+            afterMessageIsConsumed( message, false );
+            
         } else {
 
             // No listener, add it to the unconsumed messages list
@@ -391,7 +456,7 @@
                 // destroy these messages if this is not a transacted
                 // session, if it is then the tranasction will clean
                 // the messages up.
-                destroyMessage( dynamic_cast<cms::Message*>(unconsumedMessages.pop().getMessage())
);
+                destroyMessage( unconsumedMessages.pop().getMessage() );
             }
         }
     }
@@ -400,7 +465,9 @@
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ActiveMQConsumer::destroyMessage( Message* message ) throw ( ActiveMQException ){
+void ActiveMQConsumer::destroyMessage( ActiveMQMessage* message ) 
+throw ( ActiveMQException )
+{
 
     try
     {

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.h?view=diff&rev=524628&r1=524627&r2=524628
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.h Sun
Apr  1 09:43:26 2007
@@ -42,19 +42,29 @@
     {
     private:
 
-        // The session that owns this Consumer
+        /**
+         * The session that owns this Consumer
+         */
         ActiveMQSession* session;
 
-        // The Consumer info for this Consumer
+        /**
+         * The Consumer info for this Consumer
+         */
         connector::ConsumerInfo* consumerInfo;
 
-        // The Message Listener for this Consumer
+        /**
+         * The Message Listener for this Consumer
+         */
         cms::MessageListener* listener;
 
-        // Queue of unconsumed messages.
+        /**
+         * Queue of unconsumed messages.
+         */
         util::Queue<DispatchData> unconsumedMessages;
 
-        // Boolean that indicates if the consumer has been closed
+        /**
+         * Boolean that indicates if the consumer has been closed
+         */
         bool closed;
 
     public:
@@ -174,8 +184,36 @@
          * does nothing.
          * @param message the message to destroy
          */
-        virtual void destroyMessage( cms::Message* message )
+        virtual void destroyMessage( ActiveMQMessage* message )
             throw (exceptions::ActiveMQException);
+            
+        /**
+         * Used by synchronous receive methods to wait for messages to come in.
+         * @param timeout - The maximum number of milliseconds to wait before 
+         * returning.
+         * If -1, it will block until a messages is received or this consumer 
+         * is closed.
+         * If 0, will not block at all.  If > 0, will wait at a maximum the 
+         * specified number of milliseconds before returning.
+         * @return the message, if received within the allotted time.  
+         * Otherwise NULL.
+         * @throws InvalidStateException if this consumer is closed upon 
+         * entering this method.
+         */
+        ActiveMQMessage* dequeue(int timeout) throw ( cms::CMSException );
+        
+        /**
+         * Pre-consume processing
+         * @param message - the message being consumed.
+         */
+        virtual void beforeMessageIsConsumed( ActiveMQMessage* message );
+        
+        /**
+         * Post-consume processing
+         * @param message - the consumed message
+         * @param messageExpired - flag indicating if the message has expired.
+         */
+        virtual void afterMessageIsConsumed( ActiveMQMessage* message, bool messageExpired
);
 
     };
 

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.h?view=diff&rev=524628&r1=524627&r2=524628
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.h Sun
Apr  1 09:43:26 2007
@@ -131,6 +131,16 @@
          */
         bool isStarted() const;
         
+        bool isAutoAcknowledge() const {
+            return sessionInfo->getAckMode() == cms::Session::AUTO_ACKNOWLEDGE;
+        }
+        bool isDupsOkAcknowledge() const {
+            return sessionInfo->getAckMode() == cms::Session::DUPS_OK_ACKNOWLEDGE;
+        }
+        bool isClientAcknowledge() const {
+            return sessionInfo->getAckMode() == cms::Session::CLIENT_ACKNOWLEDGE;
+        }
+        
         /**
          * Fires the given exception to the exception listener of the connection
          */



Mime
View raw message