activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nmitt...@apache.org
Subject svn commit: r620209 - in /activemq/activemq-cpp/trunk/src/main/activemq/cmsutil: CachedConsumer.h CachedProducer.h CmsTemplate.cpp CmsTemplate.h PooledSession.cpp PooledSession.h
Date Sat, 09 Feb 2008 22:47:33 GMT
Author: nmittler
Date: Sat Feb  9 14:47:32 2008
New Revision: 620209

URL: http://svn.apache.org/viewvc?rev=620209&view=rev
Log:
AMQCPP-152 - Adding synchronous receive to CmsTemplate

Added:
    activemq/activemq-cpp/trunk/src/main/activemq/cmsutil/CachedConsumer.h
Modified:
    activemq/activemq-cpp/trunk/src/main/activemq/cmsutil/CachedProducer.h
    activemq/activemq-cpp/trunk/src/main/activemq/cmsutil/CmsTemplate.cpp
    activemq/activemq-cpp/trunk/src/main/activemq/cmsutil/CmsTemplate.h
    activemq/activemq-cpp/trunk/src/main/activemq/cmsutil/PooledSession.cpp
    activemq/activemq-cpp/trunk/src/main/activemq/cmsutil/PooledSession.h

Added: activemq/activemq-cpp/trunk/src/main/activemq/cmsutil/CachedConsumer.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/cmsutil/CachedConsumer.h?rev=620209&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/cmsutil/CachedConsumer.h (added)
+++ activemq/activemq-cpp/trunk/src/main/activemq/cmsutil/CachedConsumer.h Sat Feb  9 14:47:32
2008
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ACTIVEMQ_CMSUTIL_CACHEDCONSUMER_H_
+#define ACTIVEMQ_CMSUTIL_CACHEDCONSUMER_H_
+
+#include <cms/MessageConsumer.h>
+
+namespace activemq {
+namespace cmsutil {
+
+    /**
+     * A cached message consumer contained within a pooled session.
+     */
+    class CachedConsumer : public cms::MessageConsumer {
+    private:
+        
+        cms::MessageConsumer* consumer;
+        
+    public:
+        
+        CachedConsumer( cms::MessageConsumer* consumer ) {
+            this->consumer = consumer;
+        }
+        
+        virtual ~CachedConsumer() {}
+        
+        /**
+         * Does nothing - the real producer resource will be closed
+         * by the lifecycle manager.
+         */
+        virtual void close() throw( cms::CMSException ){
+            // Do nothing.
+        }
+
+        virtual cms::Message* receive() throw ( cms::CMSException ) {
+            return consumer->receive();
+        }
+
+        virtual cms::Message* receive( int millisecs ) throw ( cms::CMSException ) {
+            return consumer->receive(millisecs);
+        }
+
+        virtual cms::Message* receiveNoWait() throw ( cms::CMSException ) {
+            return consumer->receiveNoWait();
+        }
+
+        virtual void setMessageListener( cms::MessageListener* listener ) {
+            consumer->setMessageListener(listener);
+        }
+
+        virtual cms::MessageListener* getMessageListener() const {
+            return consumer->getMessageListener();
+        }
+
+        virtual std::string getMessageSelector() const 
+            throw ( cms::CMSException ) {
+            return consumer->getMessageSelector();
+        }
+        
+    };
+
+}}
+
+#endif /*ACTIVEMQ_CMSUTIL_CACHEDCONSUMER_H_*/

Modified: activemq/activemq-cpp/trunk/src/main/activemq/cmsutil/CachedProducer.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/cmsutil/CachedProducer.h?rev=620209&r1=620208&r2=620209&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/cmsutil/CachedProducer.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/cmsutil/CachedProducer.h Sat Feb  9 14:47:32
2008
@@ -24,7 +24,7 @@
 namespace cmsutil {
 
     /**
-     * A cached message roducer contained within a pooled session.
+     * A cached message producer contained within a pooled session.
      */
     class CachedProducer : public cms::MessageProducer {
     private:

Modified: activemq/activemq-cpp/trunk/src/main/activemq/cmsutil/CmsTemplate.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/cmsutil/CmsTemplate.cpp?rev=620209&r1=620208&r2=620209&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/cmsutil/CmsTemplate.cpp (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/cmsutil/CmsTemplate.cpp Sat Feb  9 14:47:32
2008
@@ -242,6 +242,37 @@
 }
 
 ////////////////////////////////////////////////////////////////////////////////
+cms::MessageConsumer* CmsTemplate::createConsumer(cms::Session* session,
+        cms::Destination* dest,
+        const std::string& selector,
+        bool noLocal ) throw (cms::CMSException) {
+
+    try {
+
+        // If no destination was provided, resolve the default.
+        if( dest == NULL ) {            
+            dest = resolveDefaultDestination(session);
+        }
+        
+        cms::MessageConsumer* consumer = NULL;
+        
+        // Try to use a cached consumer - requires that we're using a
+        // PooledSession
+        PooledSession* pooledSession = dynamic_cast<PooledSession*>(session);
+        if( pooledSession != NULL ) {
+            consumer = pooledSession->createCachedConsumer(dest, selector, noLocal);
+        } else {
+            consumer = session->createConsumer(dest, selector, noLocal);
+        }
+
+        return consumer;
+    }
+    AMQ_CATCH_RETHROW( ActiveMQException )
+    AMQ_CATCH_EXCEPTION_CONVERT( IllegalStateException, ActiveMQException )
+    AMQ_CATCHALL_THROW( ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
 void CmsTemplate::destroyProducer( cms::MessageProducer*& producer) 
 throw (cms::CMSException) {
 
@@ -266,21 +297,6 @@
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-cms::MessageConsumer* CmsTemplate::createConsumer(cms::Session* session,
-        cms::Destination* dest, const std::string& messageSelector)
-        throw (cms::CMSException) {
-
-    try {
-        cms::MessageConsumer* consumer = session->createConsumer(dest,
-                messageSelector, 
-                isNoLocal());
-        
-        return consumer;
-    }
-    AMQ_CATCH_RETHROW( ActiveMQException )
-}
-
-////////////////////////////////////////////////////////////////////////////////
 void CmsTemplate::destroyConsumer( cms::MessageConsumer*& consumer) 
 throw (cms::CMSException) {
 
@@ -290,12 +306,17 @@
     
     try {        
         
-        // Close the consumer, then destroy it.
+        // Close the producer, then destroy it.
         consumer->close();                
     }
     AMQ_CATCH_NOTHROW( cms::CMSException )
     
-    delete consumer;
+    // Destroy if it's not a cached consumer.
+    CachedConsumer* cachedConsumer = dynamic_cast<CachedConsumer*>(consumer);
+    if( cachedConsumer == NULL ) {
+        delete consumer;
+    }
+    
     consumer = NULL;
 }
 
@@ -437,7 +458,7 @@
 throw (cms::CMSException, IllegalStateException)  {
     
     try {
-        SenderExecutor senderExecutor(messageCreator, this);
+        SendExecutor senderExecutor(messageCreator, this);
         execute(&senderExecutor);
     }
     AMQ_CATCH_RETHROW( ActiveMQException )
@@ -451,7 +472,7 @@
 throw (cms::CMSException, IllegalStateException) {
     
     try {        
-        SenderExecutor senderExecutor(messageCreator, this);
+        SendExecutor senderExecutor(messageCreator, this);
         execute(dest, &senderExecutor);
     }
     AMQ_CATCH_RETHROW( ActiveMQException )
@@ -464,7 +485,7 @@
 throw (cms::CMSException, IllegalStateException) {
     
     try {
-        SenderExecutor senderExecutor(messageCreator, this);
+        SendExecutor senderExecutor(messageCreator, this);
         execute(destinationName, &senderExecutor);
     }
     AMQ_CATCH_RETHROW( ActiveMQException )
@@ -507,4 +528,84 @@
     }
 }
 
+////////////////////////////////////////////////////////////////////////////////
+cms::Message* CmsTemplate::doReceive(cms::MessageConsumer* consumer,
+        long long receiveTime ) 
+throw (cms::CMSException) {
+    
+    cms::Message* message = NULL;
+            
+    try {
+    
+        if( consumer == NULL ) {
+            throw new ActiveMQException(__FILE__, __LINE__, "consumer is NULL");
+        }
+        
+        switch( receiveTime ) {
+        case RECEIVE_TIMEOUT_NO_WAIT: {
+            message = consumer->receiveNoWait();
+            break;
+        }
+        case RECEIVE_TIMEOUT_INDEFINITE_WAIT: {
+            message = consumer->receive();
+            break;
+        }
+        default: {
+            message = consumer->receive(receiveTime);
+            break;
+        }
+        }
+        
+    } catch( ActiveMQException& e) {
+        
+        e.setMark(__FILE__, __LINE__ );
+        
+        // Destroy the message resource.
+        destroyMessage(message);
+        
+        throw e;
+    }
+}
 
+////////////////////////////////////////////////////////////////////////////////
+void CmsTemplate::ReceiveExecutor::doInCms(cms::Session* session) 
+    throw (cms::CMSException) {
+    
+    cms::MessageConsumer* consumer = NULL;
+                
+    try {
+    
+        // Create the consumer resource.
+        consumer = parent->createConsumer(session, getDestination(session), selector,
noLocal);
+        
+        // Receive the message.
+        message = parent->doReceive(consumer, receiveTime); 
+            
+        // Destroy the consumer resource.
+        parent->destroyConsumer(consumer);
+        
+    } catch( ActiveMQException& e) {
+        
+        e.setMark(__FILE__, __LINE__ );
+        
+        // Destroy the message resource.
+        parent->destroyMessage(message);
+        
+        // Destroy the consumer resource.
+        parent->destroyConsumer(consumer);
+        
+        throw e;
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+cms::Destination* CmsTemplate::ResolveReceiveExecutor::getDestination(
+        cms::Session* session ) 
+    throw (cms::CMSException) {
+    
+    try {    
+        return parent->resolveDestinationName(session, destinationName);        
+    }
+    AMQ_CATCH_RETHROW( ActiveMQException )
+    AMQ_CATCHALL_THROW( ActiveMQException )
+}

Modified: activemq/activemq-cpp/trunk/src/main/activemq/cmsutil/CmsTemplate.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/cmsutil/CmsTemplate.h?rev=620209&r1=620208&r2=620209&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/cmsutil/CmsTemplate.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/cmsutil/CmsTemplate.h Sat Feb  9 14:47:32
2008
@@ -145,9 +145,9 @@
         /**
          * Session callback that sends to the given destination.
          */
-        class SenderExecutor;
-        friend class SenderExecutor;
-        class SenderExecutor : public ProducerCallback {
+        class SendExecutor;
+        friend class SendExecutor;
+        class SendExecutor : public ProducerCallback {
         private:
             
             MessageCreator* messageCreator;
@@ -155,13 +155,13 @@
             
         public:
             
-            SenderExecutor( MessageCreator* messageCreator,
+            SendExecutor( MessageCreator* messageCreator,
                     CmsTemplate* parent) {
                 this->messageCreator = messageCreator;
                 this->parent = parent;
             }
             
-            virtual ~SenderExecutor() {}
+            virtual ~SendExecutor() {}
             
             virtual void doInCms(cms::Session* session,
                     cms::MessageProducer* producer) throw (cms::CMSException) {
@@ -169,6 +169,75 @@
             }
         };
         
+        /**
+         * Session callback that receives from the given destination.
+         */
+        class ReceiveExecutor;
+        friend class ReceiveExecutor;
+        class ReceiveExecutor : public SessionCallback {
+        protected:
+            
+            cms::Destination* destination;
+            std::string selector;
+            bool noLocal;
+            cms::Message* message;
+            CmsTemplate* parent;
+            long long receiveTime;
+            
+        public:
+            ReceiveExecutor( CmsTemplate* parent,
+                    cms::Destination* destination,
+                    const std::string& selector,
+                    bool noLocal,
+                    long long receiveTime) {
+                this->parent = parent;
+                this->destination = destination;
+                this->selector = selector;
+                this->noLocal = noLocal;                
+                this->receiveTime = receiveTime;
+                this->message = NULL;
+            }
+            
+            virtual ~ReceiveExecutor() {}
+            
+            virtual void doInCms(cms::Session* session) 
+                throw (cms::CMSException);
+            
+            virtual cms::Destination* getDestination(cms::Session* session AMQCPP_UNUSED)

+                throw (cms::CMSException) {
+                return destination;
+            }
+        };
+        
+        /**
+         * Session callback that executes a receive callback for a named destination.
+         */
+        class ResolveReceiveExecutor;
+        friend class ResolveReceiveExecutor;
+        class ResolveReceiveExecutor : public ReceiveExecutor {
+        private:
+            
+            std::string destinationName;
+            
+        public:
+            
+            ResolveReceiveExecutor(CmsTemplate* parent,
+                    const std::string& selector,
+                    bool noLocal, 
+                    long long receiveTime,
+                    const std::string& destinationName)
+            :
+                ReceiveExecutor( parent, NULL, selector, noLocal, receiveTime) {
+                
+                this->destinationName = destinationName;
+            }
+            
+            virtual ~ResolveReceiveExecutor() {}
+            
+            virtual cms::Destination* getDestination(cms::Session* session) 
+                throw (cms::CMSException);
+        };
+        
     private:
                 
         static const int NUM_SESSION_POOLS = (int)cms::Session::SESSION_TRANSACTED + 1;
@@ -566,22 +635,22 @@
          * @throws cms::CMSException thrown if the CMS methods throw.
          */
         void destroyProducer( cms::MessageProducer*& producer ) throw (cms::CMSException);
-    
+                
         /**
          * Allocates a consumer initialized with the proper values.
          * 
          * @param session
          *          The session from which to create a consumer
          * @param dest
-         *          The destination for which to create the consumer
-         * @param messageSelector
-         *          The message selector for the consumer.
-         * @return the new consumer
-         * @throws cms::CMSException if the CMS methods throw.
+         *          The destination for which to create the consumer.  If
+         *          this is NULL, the default will be used.
+         * @return the consumer
+         * @throws cms::CMSException thrown by the CMS API
          */
         cms::MessageConsumer* createConsumer(cms::Session* session,
-                cms::Destination* dest, const std::string& messageSelector)
-        throw (cms::CMSException);
+                cms::Destination* dest,
+                const std::string& selector,
+                bool noLocal ) throw (cms::CMSException);
         
         /**
          * Closes and destroys a consumer resource
@@ -611,6 +680,18 @@
         void doSend(cms::Session* session,
                 cms::MessageProducer* producer, 
                 MessageCreator* messageCreator) throw (cms::CMSException);
+        
+        /**
+         * Receives a message from a destination.
+         * @param consumer 
+         *          the consumer to receive from
+         * @param receiveTime 
+         *          the time to wait for the receive.
+         * @return the message that was read
+         * @throws cms::CMSException thrown if the CMS API throws.
+         */
+        cms::Message* doReceive(cms::MessageConsumer* consumer,
+                long long receiveTime ) throw (cms::CMSException);
         
         /**
          * Resolves the default destination and returns it.

Modified: activemq/activemq-cpp/trunk/src/main/activemq/cmsutil/PooledSession.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/cmsutil/PooledSession.cpp?rev=620209&r1=620208&r2=620209&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/cmsutil/PooledSession.cpp (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/cmsutil/PooledSession.cpp Sat Feb  9 14:47:32
2008
@@ -33,11 +33,19 @@
 ////////////////////////////////////////////////////////////////////////////////
 PooledSession::~PooledSession(){
     
+    // Destroy cached producers.
     std::vector<CachedProducer*> cachedProducers = producerCache.getValues();
     for( std::size_t ix = 0; ix < cachedProducers.size(); ++ix ) {
         delete cachedProducers[ix];
     }
     cachedProducers.clear();
+    
+    // Destroy cached consumers.
+    std::vector<CachedConsumer*> cachedConsumers = consumerCache.getValues();
+    for( std::size_t ix = 0; ix < cachedConsumers.size(); ++ix ) {
+        delete cachedConsumers[ix];
+    }
+    cachedConsumers.clear();
 }
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -59,12 +67,12 @@
             throw ActiveMQException(__FILE__, __LINE__, "destination is NULL");
         }
         
-        std::string destName = getUniqueDestName(destination);
+        std::string key = getUniqueDestName(destination);
         
         // Check the cache - add it if necessary.
         CachedProducer* cachedProducer = NULL;
         try {            
-            cachedProducer = producerCache.getValue(destName);            
+            cachedProducer = producerCache.getValue(key);            
         } catch( decaf::lang::exceptions::NoSuchElementException& e ) {
             
             // No producer exists for this destination - start by creating
@@ -78,7 +86,7 @@
             cachedProducer = new CachedProducer(p);
             
             // Add it to the cache.
-            producerCache.setValue(destName, cachedProducer);
+            producerCache.setValue(key, cachedProducer);
         }
         
         return cachedProducer;
@@ -88,18 +96,65 @@
 }
 
 ////////////////////////////////////////////////////////////////////////////////
+cms::MessageConsumer* PooledSession::createCachedConsumer(
+    const cms::Destination* destination,
+    const std::string& selector,
+    bool noLocal) throw ( cms::CMSException ) {
+    
+    try {
+            
+        if( destination == NULL ) {
+            throw ActiveMQException(__FILE__, __LINE__, "destination is NULL");
+        }
+        
+        // Append the selector and noLocal flag onto the key.
+        std::string key = getUniqueDestName(destination);
+        key += "s=";
+        key += selector;
+        key += ",nl=";
+        key += (noLocal? "t" : "f");
+                
+        // Check the cache - add it if necessary.
+        CachedConsumer* cachedConsumer = NULL;
+        try {            
+            cachedConsumer = consumerCache.getValue(key);            
+        } catch( decaf::lang::exceptions::NoSuchElementException& e ) {
+            
+            // No producer exists for this destination - start by creating
+            // a new consumer resource.
+            cms::MessageConsumer* c = session->createConsumer(destination, selector, noLocal);
                                   
+            
+            // Add the consumer resource to the resource lifecycle manager.
+            pool->getResourceLifecycleManager()->addMessageConsumer(c);
+            
+            // Create the cached consumer wrapper.
+            cachedConsumer = new CachedConsumer(c);
+            
+            // Add it to the cache.
+            consumerCache.setValue(key, cachedConsumer);
+        }
+        
+        return cachedConsumer;
+    }
+    AMQ_CATCH_RETHROW( ActiveMQException )
+    AMQ_CATCHALL_THROW( ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
 std::string PooledSession::getUniqueDestName( const cms::Destination* dest ) {
     
-    std::string destName;
+    std::string destName = "[";
     const cms::Queue* queue = dynamic_cast<const cms::Queue*>(dest);
     if( queue != NULL ) {
-        destName = "q:" + queue->getQueueName();
+        destName += "q:" + queue->getQueueName();
     } else {
         const cms::Topic* topic = dynamic_cast<const cms::Topic*>(dest);
         if( topic != NULL ) {
-            destName = "t:" + topic->getTopicName();
+            destName += "t:" + topic->getTopicName();
         }
     }
+    
+    destName += "]";
     
     return destName;
 }

Modified: activemq/activemq-cpp/trunk/src/main/activemq/cmsutil/PooledSession.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/cmsutil/PooledSession.h?rev=620209&r1=620208&r2=620209&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/cmsutil/PooledSession.h (original)
+++ activemq/activemq-cpp/trunk/src/main/activemq/cmsutil/PooledSession.h Sat Feb  9 14:47:32
2008
@@ -21,6 +21,7 @@
 #include <cms/Session.h>
 #include <decaf/util/Map.h>
 #include <activemq/cmsutil/CachedProducer.h>
+#include <activemq/cmsutil/CachedConsumer.h>
 
 namespace activemq {
 namespace cmsutil {
@@ -41,6 +42,8 @@
         
         decaf::util::Map<std::string, CachedProducer*> producerCache;
         
+        decaf::util::Map<std::string, CachedConsumer*> consumerCache;
+        
     public:
         
     	PooledSession( SessionPool* pool, cms::Session* session );
@@ -111,6 +114,25 @@
                 throw ( cms::CMSException ) {
             return session->createDurableConsumer(destination, name, selector, noLocal);
         }
+        
+        /**
+         * First checks the internal consumer cache and creates on if none exist
+         * for the given destination, selector, noLocal.  If created, the consumer is
+         * added to the pool's lifecycle manager.
+         * 
+         * @param destiation
+         *          the destination to receive on
+         * @param selector
+         *          the selector to use
+         * @param noLocal
+         *          whether or not to receive messages from the same connection
+         * @return the consumer resource
+         * @throws cms::CMSException if something goes wrong.
+         */
+        virtual cms::MessageConsumer* createCachedConsumer(
+                const cms::Destination* destination,
+                const std::string& selector,
+                bool noLocal) throw ( cms::CMSException );
         
         virtual cms::MessageProducer* createProducer( const cms::Destination* destination
)
             throw ( cms::CMSException ) {



Mime
View raw message