activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nmitt...@apache.org
Subject svn commit: r418749 [6/17] - in /incubator/activemq/trunk/activemq-cpp: ./ src/ src/main/ src/main/activemq/ src/main/activemq/concurrent/ src/main/activemq/connector/ src/main/activemq/connector/openwire/ src/main/activemq/connector/stomp/ src/main/ac...
Date Mon, 03 Jul 2006 11:51:54 GMT
Added: incubator/activemq/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnectionData.h
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnectionData.h?rev=418749&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnectionData.h (added)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnectionData.h Mon Jul  3 04:51:36 2006
@@ -0,0 +1,120 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef _ACTIVEMQ_CORE_ACTIVEMQCONNECTIONDATA_H_
+#define _ACTIVEMQ_CORE_ACTIVEMQCONNECTIONDATA_H_
+
+#include <activemq/connector/Connector.h>
+#include <activemq/transport/Transport.h>
+#include <activemq/exceptions/IllegalArgumentException.h>
+#include <activemq/util/Properties.h>
+
+namespace activemq{
+namespace core{
+
+    /**
+     * Container of the Data that is needed when creating a new connection
+     * object.  Each ActiveMQConnection owns one of these objects.  This
+     * object knows how to clean up the Connection Dependencies correctly
+     */
+    class ActiveMQConnectionData
+    {
+    private:
+    
+        // Connector Object
+        connector::Connector* connector;
+                
+        // Transport we are using
+        transport::Transport* transport;
+        
+        // Properties used to configure this connection.
+        util::Properties* properties;
+        
+    public:
+
+        /**
+         * Creates a new Connection Data object, passing it the data that
+         * it will manage for the parent connection object.
+         * @param A connector instance
+         * @param A Socket instance
+         * @param A Transport instance
+         * @throw IllegalArgumentException if an element is NULL
+         */
+        ActiveMQConnectionData( connector::Connector* connector,
+                                transport::Transport* transport,
+                                util::Properties* properties )
+        {
+            if( connector  == NULL || 
+                transport  == NULL || 
+                properties == NULL )
+            {
+                throw exceptions::IllegalArgumentException(
+                    __FILE__, __LINE__,
+                    "ActiveMQConnectionData::ActiveMQConnectionData - "
+                    "Required Parameter was NULL.");
+            }
+
+            this->connector = connector;
+            this->transport = transport;
+            this->properties = properties;
+        }
+        
+        virtual ~ActiveMQConnectionData(void) 
+        {
+            try
+            {
+                connector->close();
+                delete connector;
+                
+                transport->close();
+                delete transport;
+                
+                delete properties;
+            }
+            AMQ_CATCH_NOTHROW( exceptions::ActiveMQException )
+            AMQ_CATCHALL_NOTHROW( )
+        }
+
+        /**
+         * Get the Connector that this Connection Data object holds
+         * @return Connector Pointer
+         */
+        virtual connector::Connector* getConnector(void){
+            return connector;
+        }
+
+        /**
+         * Get the Connector that this Connection Data object holds
+         * @return Connector Pointer
+         */
+        virtual transport::Transport* getTransport(void){
+            return transport;
+        }
+
+        /**
+         * Gets a reference to the properties that were used to configure
+         * this Connection.
+         * @return Properties object reference.
+         */
+        virtual const util::Properties& getProperties(void) const {
+            return *properties;
+        }
+        
+    };
+
+}}
+
+#endif /*_ACTIVEMQ_CORE_ACTIVEMQCONNECTIONDATA_H_*/

Added: incubator/activemq/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnectionFactory.cpp
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnectionFactory.cpp?rev=418749&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnectionFactory.cpp (added)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnectionFactory.cpp Mon Jul  3 04:51:36 2006
@@ -0,0 +1,245 @@
+/*
+* Copyright 2006 The Apache Software Foundation or its licensors, as
+* applicable.
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+#include "ActiveMQConnectionFactory.h"
+
+#include <activemq/util/Guid.h>
+#include <activemq/util/SimpleProperties.h>
+#include <activemq/util/StringTokenizer.h>
+#include <activemq/connector/ConnectorFactoryMap.h>
+#include <activemq/network/SocketFactory.h>
+#include <activemq/transport/TransportFactoryMap.h>
+#include <activemq/network/Socket.h>
+#include <activemq/exceptions/NullPointerException.h>
+#include <activemq/core/ActiveMQConnection.h>
+#include <activemq/util/StringTokenizer.h>
+#include <activemq/support/LibraryInit.h>
+
+using namespace std;
+using namespace activemq;
+using namespace activemq::core;
+using namespace activemq::util;
+using namespace activemq::connector;
+using namespace activemq::exceptions;
+using namespace activemq::network;
+using namespace activemq::transport;
+
+////////////////////////////////////////////////////////////////////////////////
+ActiveMQConnectionFactory::ActiveMQConnectionFactory(void)
+{
+    brokerURL = "tcp://localhost:61616";
+    
+    this->username = "";
+    this->password = "";
+    this->clientId = "";
+}
+
+////////////////////////////////////////////////////////////////////////////////
+ActiveMQConnectionFactory::ActiveMQConnectionFactory(const std::string& url,
+                                                     const std::string& username,
+                                                     const std::string& password,
+                                                     const std::string& clientId)
+{
+    brokerURL = url;
+
+    this->username = username;
+    this->password = password;
+    this->clientId = clientId;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+cms::Connection* ActiveMQConnectionFactory::createConnection(void) 
+throw ( cms::CMSException )
+{
+    return createConnection(username, password);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+cms::Connection* ActiveMQConnectionFactory::createConnection(
+    const std::string& username,
+    const std::string& password,
+    const std::string& clientId ) 
+       throw ( cms::CMSException )
+{
+    // Declared here so that they can be deleted in the catch block
+    SimpleProperties* properties = NULL;
+    Transport* transport = NULL;
+    Connector* connector = NULL;
+    ActiveMQConnectionData* connectionData = NULL;
+    ActiveMQConnection* connection = NULL;
+    
+    this->username = username;
+    this->password = password;
+    this->clientId = clientId;
+
+    try
+    {
+        properties = new SimpleProperties;
+
+        // if no Client Id specified, create one
+        if( this->clientId == "" )
+        {
+            this->clientId = Guid::createGUIDString();
+        }
+
+        // Store login data in the properties
+        properties->setProperty( "username", this->username );
+        properties->setProperty( "password", this->password );
+        properties->setProperty( "clientId", this->clientId );
+
+        // Parse out the properties from the URI
+        parseURL( brokerURL, *properties );
+
+        // Create the Transport that the Connector will use.
+        string factoryName = 
+            properties->getProperty( "transport", "tcp" );
+        TransportFactory* factory = 
+            TransportFactoryMap::getInstance().lookup( factoryName );
+        if( factory == NULL ){
+            throw ActiveMQException( 
+                __FILE__, __LINE__, 
+                "ActiveMQConnectionFactory::createConnection - "
+                "unknown transport factory");
+        }
+        
+        // Create the transport.
+        transport = factory->createTransport( *properties );
+        if( transport == NULL ){
+            throw ActiveMQException( 
+                __FILE__, __LINE__, 
+                "ActiveMQConnectionFactory::createConnection - "
+                "failed creating new Transport");
+        }
+
+        // What wire format are we using, defaults to Stomp
+        std::string wireFormat = 
+            properties->getProperty( "wireFormat", "stomp" );
+
+        // Now try and find a factory to create the Connector
+        ConnectorFactory* connectorfactory = 
+            ConnectorFactoryMap::getInstance()->lookup( wireFormat );
+
+        if( connectorfactory == NULL )
+        {
+            throw NullPointerException(
+                __FILE__, __LINE__,
+                "ActiveMQConnectionFactory::createConnection - "
+                "Connector for Wire Format not registered in Map");
+        }
+
+        // Create the Connector.
+        connector = connectorfactory->createConnector( *properties, transport );
+
+        if(connector == NULL)
+        {
+            throw NullPointerException(
+                __FILE__, __LINE__,
+                "ActiveMQConnectionFactory::createConnection - "
+                "Failed to Create the Connector");
+        }
+        
+        // Start the Connector
+        connector->start();
+
+        // Create Holder and store the data for the Connection
+        connectionData = new ActiveMQConnectionData(
+            connector, transport, properties );
+
+        // Create and Return the new connection object.
+        connection = new ActiveMQConnection( connectionData );
+        
+        return connection;
+    }
+    catch( exceptions::ActiveMQException& ex )
+    {
+        ex.setMark( __FILE__, __LINE__ );
+
+        delete connection;
+        delete connector;
+        delete transport;
+        delete properties;
+        
+        throw ex;
+    }
+    catch( ... )
+    {
+        exceptions::ActiveMQException ex( 
+           __FILE__, __LINE__, 
+           "ActiveMQConnectionFactory::create - "
+           "caught unknown exception" );
+
+        delete connection;
+        delete connector;
+        delete transport;
+        delete properties;
+
+        throw ex;
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConnectionFactory::parseURL(const std::string& URI, 
+                                         Properties& properties)
+    throw ( exceptions::IllegalArgumentException )
+{
+    try
+    {
+        StringTokenizer tokenizer(URI, ":/");
+    
+        std::vector<std::string> tokens;
+    
+        // Require that there be three tokens at the least, these are
+        // transport, url, port.
+        if(tokenizer.countTokens() < 3)
+        {
+            throw exceptions::IllegalArgumentException(
+                __FILE__, __LINE__,
+                (string("ActiveMQConnectionFactory::parseURL - "
+                        "Marlformed URI: ") + URI).c_str());
+        }
+    
+        // First element should be the Transport Type, following that is the
+        // URL and any params.  
+        properties.setProperty( "transport", tokenizer.nextToken() );
+
+        // Parse URL and Port as one item, optional params follow the ?
+        // and then each param set is delimited with & we extract first
+        // three chars as they are the left over ://
+        properties.setProperty( "uri", tokenizer.nextToken("&?").substr(3) );
+    
+        // Now get all the optional parameters and store them as properties
+        int count = tokenizer.toArray(tokens);
+        
+        for(int i = 0; i < count; ++i)
+        {
+            tokenizer.reset(tokens[i], "=");
+    
+            if(tokenizer.countTokens() != 2)
+            {
+                throw exceptions::IllegalArgumentException(
+                    __FILE__, __LINE__,
+                    (string("ActiveMQConnectionFactory::parseURL - "
+                           "Marlformed Parameter = ") + tokens[i]).c_str());
+            }
+    
+            // Store this param as a property
+            properties.setProperty(tokenizer.nextToken(), tokenizer.nextToken());
+        }
+    }
+    AMQ_CATCH_RETHROW( IllegalArgumentException )
+    AMQ_CATCH_EXCEPTION_CONVERT( ActiveMQException, IllegalArgumentException )
+    AMQ_CATCHALL_THROW( IllegalArgumentException )
+}

Added: incubator/activemq/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnectionFactory.h
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnectionFactory.h?rev=418749&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnectionFactory.h (added)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConnectionFactory.h Mon Jul  3 04:51:36 2006
@@ -0,0 +1,178 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef _ACTIVEMQ_CORE_ACTIVEMQCONNECTIONFACTORY_H_
+#define _ACTIVEMQ_CORE_ACTIVEMQCONNECTIONFACTORY_H_
+
+#include <cms/ConnectionFactory.h>
+#include <cms/Connection.h>
+
+#include <activemq/exceptions/IllegalArgumentException.h>
+
+namespace activemq{
+namespace core{
+
+   class util::Properties;
+
+   class ActiveMQConnectionFactory : public cms::ConnectionFactory
+   {
+   private:
+   
+      // The user name this factory will use to connect
+      std::string username;
+      
+      // The password this factory will use to connect
+      std::string password;
+      
+      // The client id to assign to the connection created
+      std::string clientId;
+      
+      // The URL of the Broker, the default is:
+      // "tcp://localhost:61616"
+      std::string brokerURL;
+
+   public:
+      
+      /**
+       * Constructor
+       */
+   	  ActiveMQConnectionFactory(void);
+
+      /**
+       * Constructor
+       * @param the URL of the Broker we are connecting to.
+       * @param username to authenticate with, defaults to ""
+       * @param password to authenticate with, defaults to ""
+       * @param client Id to assign to connection, defaults to ""
+       */
+      ActiveMQConnectionFactory(const std::string& url,
+                                const std::string& username = "",
+                                const std::string& password = "",
+                                const std::string& clientId = "");
+
+      /**
+       * Destructor
+       */
+   	  virtual ~ActiveMQConnectionFactory(void) {}
+
+      /**
+       * Creates a connection with the default user identity. The 
+       * connection is created in stopped mode. No messages will be 
+       * delivered until the Connection.start method is explicitly 
+       * called. 
+       * @throws CMSException
+       */
+      virtual cms::Connection* createConnection(void) throw ( cms::CMSException );
+
+      /**
+       * Creates a connection with the specified user identity. The 
+       * connection is created in stopped mode. No messages will be 
+       * delivered until the Connection.start method is explicitly called.
+       * @throw CMSException.
+       */
+      virtual cms::Connection* createConnection(const std::string& username,
+                                                const std::string& password,
+                                                const std::string& clientId = "")
+         throw ( cms::CMSException );
+                                       
+      /**
+       * Sets the username that should be used when creating a new connection
+       * @param username string
+       */
+      virtual void setUsername(const std::string& username){
+         this->username = username;
+      }
+      
+      /**
+       * Gets the username that this factory will use when creating a new
+       * connection instance.
+       * @return username string, "" for default credentials
+       */
+      virtual const std::string& getUsername(void) const {
+         return username;
+      }
+      
+      /**
+       * Sets the password that should be used when creating a new connection
+       * @param password string
+       */
+      virtual void setPassword(const std::string& password){
+         this->password = password;
+      }
+      
+      /**
+       * Gets the password that this factory will use when creating a new
+       * connection instance.
+       * @return password string, "" for default credentials
+       */
+      virtual const std::string& getPassword(void) const {
+         return password;
+      }
+
+      /**
+       * Sets the Broker URL that should be used when creating a new 
+       * connection instance
+       * @param brokerURL string
+       */
+      virtual void setBrokerURL(const std::string& brokerURL){
+         this->brokerURL = brokerURL;
+      }
+      
+      /**
+       * Gets the Broker URL that this factory will use when creating a new
+       * connection instance.
+       * @return brokerURL string
+       */
+      virtual const std::string& getBrokerURL(void) const {
+         return brokerURL;
+      }
+
+      /**
+       * Sets the Client Id that should be used when creating a new 
+       * connection instance
+       * @param clientId string
+       */
+      virtual void setClientId(const std::string& clientId){
+         this->clientId = clientId;
+      }
+      
+      /**
+       * Gets the Client Id that this factory will use when creating a new
+       * connection instance.
+       * @return clientId string
+       */
+      virtual const std::string& getClientId(void) const {
+         return clientId;
+      }
+      
+   protected:
+
+      /**
+       * Parses the properties out of the provided Broker URI and sets
+       * them in the passed Properties Object.
+       * @param a Broker URI to parse
+       * @param a Properties object to set the parsed values in
+       * @throws IllegalArgumentException if the passed URI is invalid
+       */
+      virtual void parseURL(const std::string& URI, 
+                            util::Properties& properties)
+         throw ( exceptions::IllegalArgumentException );
+         
+   };
+
+}}
+
+#endif /*_ACTIVEMQ_CORE_ACTIVEMQCONNECTIONFACTORY_H_*/

Added: incubator/activemq/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp?rev=418749&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp (added)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.cpp Mon Jul  3 04:51:36 2006
@@ -0,0 +1,443 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "ActiveMQConsumer.h"
+
+#include <activemq/exceptions/NullPointerException.h>
+#include <activemq/core/ActiveMQSession.h>
+#include <activemq/core/ActiveMQMessage.h>
+#include <cms/ExceptionListener.h>
+
+using namespace std;
+using namespace cms;
+using namespace activemq;
+using namespace activemq::core;
+using namespace activemq::exceptions;
+using namespace activemq::concurrent;
+
+////////////////////////////////////////////////////////////////////////////////
+ActiveMQConsumer::ActiveMQConsumer(connector::ConsumerInfo* consumerInfo,
+                                   ActiveMQSession* session)
+{
+    if(session == NULL || consumerInfo == NULL)
+    {
+        throw NullPointerException(
+            __FILE__, __LINE__,
+            "ActiveMQConsumer::ActiveMQConsumer - Init with NULL Session");
+    }
+    
+    // Init Producer Data
+    this->session        = session;
+    this->consumerInfo   = consumerInfo;
+    this->listenerThread = NULL;
+    this->listener       = NULL;
+    this->shutdown       = false;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+ActiveMQConsumer::~ActiveMQConsumer(void)
+{
+    try
+    {
+        // Dispose of the Consumer Info, this should stop us from getting
+        // any more messages.
+        session->onDestroySessionResource(this);
+        
+        // Stop the asynchronous message processin thread if it's
+        // running.
+        stopThread();
+        
+        // Purge all the pending messages
+        purgeMessages();
+    }
+    AMQ_CATCH_NOTHROW( ActiveMQException )
+    AMQ_CATCHALL_NOTHROW( )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+std::string ActiveMQConsumer::getMessageSelector(void) const 
+    throw ( cms::CMSException )
+{
+    try
+    {
+        // Fetch the Selector
+        return consumerInfo->getMessageSelector();
+    }
+    AMQ_CATCH_RETHROW( ActiveMQException )
+    AMQ_CATCHALL_THROW( ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+cms::Message* ActiveMQConsumer::receive(void) throw ( cms::CMSException )
+{
+    try
+    {
+        synchronized(&msgQueue)
+        {
+            // Check for empty in case of spurious wakeup, or race to
+            // queue lock.
+            while(!shutdown && msgQueue.empty())
+            {
+                msgQueue.wait();
+            }
+            
+            // This will only happen when this object is being
+            // destroyed in another thread context - kind of
+            // scary.
+            if( shutdown ){
+                throw ActiveMQException( __FILE__, __LINE__,
+                    "Consumer is being destroyed in another thread" );
+            }
+            
+            // Fetch the Message then copy it so it can be handed off
+            // to the user.
+            cms::Message* message = msgQueue.pop();
+            cms::Message* result = message->clone();
+
+            // The Message is cleaned up here if the Session is not
+            // transacted, otherwise we let the transaction clean up
+            // this message as it will have already been ack'd and 
+            // stored for later redelivery.
+            destroyMessage( message );
+
+            return result;
+        }
+
+        return NULL;
+    }
+    AMQ_CATCH_RETHROW( ActiveMQException )
+    AMQ_CATCHALL_THROW( ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+cms::Message* ActiveMQConsumer::receive(int millisecs) 
+    throw ( cms::CMSException )
+{
+    try
+    {
+        synchronized(&msgQueue)
+        {
+            // Check for empty, and wait if its not
+            if( msgQueue.empty() ){
+                msgQueue.wait(millisecs);
+
+                // if its still empty...bail
+                if( msgQueue.empty() ) {
+                    return NULL;
+                }
+            }
+
+            // Fetch the Message then copy it so it can be handed off
+            // to the user.
+            cms::Message* message = msgQueue.pop();
+            cms::Message* result = message->clone();
+
+            // The Message is cleaned up here if the Session is not
+            // transacted, otherwise we let the transaction clean up
+            // this message as it will have already been ack'd and 
+            // stored for later redelivery.
+            destroyMessage( message );
+
+            return result;
+        }
+
+        return NULL;
+    }
+    AMQ_CATCH_RETHROW( ActiveMQException )
+    AMQ_CATCHALL_THROW( ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+cms::Message* ActiveMQConsumer::receiveNoWait(void) 
+    throw ( cms::CMSException )
+{
+    try
+    {
+        synchronized(&msgQueue)
+        {
+            if(!msgQueue.empty())
+            {
+                // Fetch the Message then copy it so it can be handed off
+                // to the user.
+                cms::Message* message = msgQueue.pop();
+                cms::Message* result = message->clone();
+
+                // The Message is cleaned up here if the Session is not
+                // transacted, otherwise we let the transaction clean up
+                // this message as it will have already been ack'd and 
+                // stored for later redelivery.
+                destroyMessage( message );
+
+                return result;
+            }
+        }
+        
+        return NULL;
+    }
+    AMQ_CATCH_RETHROW( ActiveMQException )
+    AMQ_CATCHALL_THROW( ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConsumer::setMessageListener(cms::MessageListener* listener)
+{
+    try
+    {
+        synchronized(&listenerLock)
+        {
+            this->listener = listener;
+        }
+        
+        // Start the thread if it isn't already running.
+        // If it is already running, this method will wake the thread up
+        // to notify it that there is a message listener, so that it may
+        // get rid of backed up messages.
+        startThread();                
+    }
+    AMQ_CATCH_RETHROW( ActiveMQException )
+    AMQ_CATCHALL_THROW( ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConsumer::acknowledgeMessage( const ActiveMQMessage* message )
+   throw ( cms::CMSException )
+{
+    try
+    {
+        // Delegate the Ack to the Session, we cast away copnstness since
+        // in a transactional session we might need to redeliver this
+        // message and update its data.
+        session->acknowledge(this, const_cast< ActiveMQMessage*>( message ) );
+    }
+    AMQ_CATCH_RETHROW( ActiveMQException )
+    AMQ_CATCHALL_THROW( ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConsumer::run(void)
+{
+    try
+    {
+        while(!shutdown)
+        {
+            Message* message = NULL;
+
+            synchronized(&msgQueue)
+            {
+                
+                // Gaurd against spurious wakeup or race to sync lock
+                // also if the listner has been unregistered we don't
+                // have anyone to notify, so we wait till a new one is
+                // registered, and then we will deliver the backlog
+                while(msgQueue.empty() || listener == NULL)
+                {
+                    if( shutdown )
+                    {
+                        break;
+                    }
+                    msgQueue.wait();
+                }
+                
+                // don't want to process messages if we are shutting down.
+                if(shutdown)
+                {
+                    return;
+                }
+                
+                // Dispatch the message
+                message = msgQueue.pop();
+            }
+        
+            // Notify the listener
+            notifyListener( message );            
+            
+            // The Message is cleaned up here if the Session is not
+            // transacted, otherwise we let the transaction clean up
+            // this message as it will have already been ack'd and 
+            // stored for later redelivery.
+            destroyMessage( message );
+        }
+    }
+    catch( ... )
+    {
+        cms::ExceptionListener* listener = session->getExceptionListener();
+        
+        if(listener != NULL)
+        {
+            listener->onException( ActiveMQException(
+                __FILE__, __LINE__,
+                "ActiveMQConsumer::run - "
+                "MessageListener threw an unknown Exception, recovering..."));
+        }
+    }        
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConsumer::dispatch(ActiveMQMessage* message) 
+    throw ( cms::CMSException )
+{
+    try
+    {
+        // If the Session is in ClientAcknowledge mode, then we set the 
+        // handler in the message to this object and send it out.  Otherwise
+        // we ack it here for all the other Modes.
+        if(session->getAcknowledgeMode() == Session::ClientAcknowledge)
+        {
+            // Register ourself so that we can handle the Message's
+            // acknowledge method.
+            message->setAckHandler(this);
+        }
+        else
+        {
+            session->acknowledge(this, message);
+        }
+
+        // No listener, so we queue it
+        synchronized(&msgQueue)
+        {
+            msgQueue.push( dynamic_cast< cms::Message* >( message ) );
+            msgQueue.notifyAll();
+        }
+    }
+    AMQ_CATCH_RETHROW( ActiveMQException )
+    AMQ_CATCHALL_THROW( ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConsumer::purgeMessages(void)
+{
+    try
+    {
+        synchronized(&msgQueue)
+        {
+            while(!msgQueue.empty())
+            {
+                // destroy these messages if this is not a transacted
+                // session, if it is then the tranasction will clean 
+                // the messages up.
+                destroyMessage( msgQueue.pop() );
+            }
+        }
+    }
+    AMQ_CATCH_RETHROW( ActiveMQException )
+    AMQ_CATCHALL_THROW( ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConsumer::onActiveMQMessage( ActiveMQMessage* message )
+    throw ( ActiveMQException )
+{
+    try
+    {
+        if( message == NULL )
+        {
+            throw ActiveMQException(
+                __FILE__, __LINE__,
+                "ActiveMQConsumer::onActiveMQMessage - Passed a Null Message");
+        }
+
+        this->dispatch( message );
+    }        
+    AMQ_CATCH_RETHROW( ActiveMQException )
+    AMQ_CATCHALL_THROW( ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConsumer::notifyListener( Message* message ){
+    
+    try
+    {
+        MessageListener* listener = NULL;
+        synchronized(&listenerLock)
+        {
+            listener = getMessageListener();                
+        }
+        if(listener != NULL)
+        {
+            listener->onMessage(*message);
+        }
+    }        
+    AMQ_CATCH_RETHROW( ActiveMQException )
+    AMQ_CATCHALL_THROW( ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConsumer::destroyMessage( Message* message ){
+    
+    try
+    {
+        /**
+         * Only destroy the message if the session is NOT transacted.  If
+         * it is, the session will take care of it.
+         */
+        if( message != NULL && !session->isTransacted() )
+        {
+            delete message;
+        }
+    }        
+    AMQ_CATCH_RETHROW( ActiveMQException )
+    AMQ_CATCHALL_THROW( ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConsumer::startThread(){
+    
+    try
+    {
+        // Start the thread, if it's not already started.
+        if(listenerThread == NULL)
+        {
+            listenerThread = new Thread(this);        
+            listenerThread->start();                        
+        }
+        
+        // notify the Queue so that any pending messages get delivered
+        synchronized(&msgQueue)
+        {
+            msgQueue.notifyAll();
+        }
+    }        
+    AMQ_CATCH_RETHROW( ActiveMQException )
+    AMQ_CATCHALL_THROW( ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConsumer::stopThread(){
+    
+    try
+    {
+        shutdown = true;
+        
+        // if the thread is running signal it to quit and then
+        // wait for run to return so thread can die
+        if(listenerThread != NULL)
+        {                        
+            synchronized( &msgQueue )
+            {
+                // Force a wakeup if run is in a wait.
+                msgQueue.notifyAll();
+            }
+
+            // Wait for it to die and then delete it.
+            listenerThread->join();
+            delete listenerThread;
+            listenerThread = NULL;
+        }
+    }        
+    AMQ_CATCH_RETHROW( ActiveMQException )
+    AMQ_CATCHALL_THROW( ActiveMQException )
+}
+

Added: incubator/activemq/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.h
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.h?rev=418749&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.h (added)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/core/ActiveMQConsumer.h Mon Jul  3 04:51:36 2006
@@ -0,0 +1,227 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef _ACTIVEMQ_CORE_ACTIVEMQCONSUMER_H_
+#define _ACTIVEMQ_CORE_ACTIVEMQCONSUMER_H_
+
+#include <cms/MessageConsumer.h>
+#include <cms/MessageListener.h>
+#include <cms/Message.h>
+#include <cms/CMSException.h>
+
+#include <activemq/connector/ConsumerInfo.h>
+#include <activemq/util/Queue.h>
+#include <activemq/core/ActiveMQAckHandler.h>
+#include <activemq/core/ActiveMQMessageListener.h>
+#include <activemq/core/ActiveMQSessionResource.h>
+#include <activemq/concurrent/Runnable.h>
+#include <activemq/concurrent/Mutex.h>
+
+namespace activemq{
+namespace core{
+
+    class ActiveMQSession;
+
+    class ActiveMQConsumer : 
+        public cms::MessageConsumer,
+        public ActiveMQAckHandler,
+        public concurrent::Runnable,
+        public ActiveMQMessageListener,
+        public ActiveMQSessionResource
+    {
+    private:
+    
+        // The session that owns this Consumer
+        ActiveMQSession* session;
+        
+        // The Consumer info for this Consumer
+        connector::ConsumerInfo* consumerInfo;
+        
+        // The Message Listener for this Consumer
+        cms::MessageListener* listener;
+        
+        // Lock to protect us from dispatching to a dead listener
+        concurrent::Mutex listenerLock;
+        
+        // Message Queue
+        util::Queue<cms::Message*> msgQueue;
+        
+        // Thread to notif a listener if one is added
+        concurrent::Thread* listenerThread;
+        
+        // Boolean to indicate that the listener Thread is shutting
+        // down and the run method should return.
+        bool shutdown;
+        
+    public:
+
+        /**
+         * Constructor
+         */
+        ActiveMQConsumer(connector::ConsumerInfo* consumerInfo,
+                         ActiveMQSession* session);
+
+        /**
+         * Destructor
+         */
+        virtual ~ActiveMQConsumer(void);
+
+    public:  // Interface Implementation
+    
+        /**
+         * Synchronously Receive a Message
+         * @return new message
+         * @throws CMSException
+         */
+        virtual cms::Message* receive(void) throw ( cms::CMSException );
+
+        /**
+         * Synchronously Receive a Message, time out after defined interval.
+         * Returns null if nothing read.
+         * @return new message
+         * @throws CMSException
+         */
+        virtual cms::Message* receive(int millisecs) throw ( cms::CMSException );
+
+        /**
+         * Receive a Message, does not wait if there isn't a new message
+         * to read, returns NULL if nothing read.
+         * @return new message
+         * @throws CMSException
+         */
+        virtual cms::Message* receiveNoWait(void) throw ( cms::CMSException );
+
+        /**
+         * Sets the MessageListener that this class will send notifs on
+         * @param MessageListener interface pointer
+         */
+        virtual void setMessageListener(cms::MessageListener* listener);
+
+        /**
+         * Gets the MessageListener that this class will send notifs on
+         * @param MessageListener interface pointer
+         */
+        virtual cms::MessageListener* getMessageListener(void) const {
+            return this->listener;
+        }
+
+        /**
+         * Gets this message consumer's message selector expression.
+         * @return This Consumer's selector expression or "".
+         * @throws cms::CMSException
+         */
+        virtual std::string getMessageSelector(void) const 
+          throw ( cms::CMSException );
+          
+        /**
+         * Method called to acknowledge the message passed
+         * @param Message to Acknowlegde
+         * @throw CMSException
+         */
+        virtual void acknowledgeMessage( const ActiveMQMessage* message )
+            throw ( cms::CMSException );
+
+        /**
+         * Run method that is called from the Thread class when this object
+         * is registered with a Thread and started.  This function reads from
+         * the message queue and dispatches calls to the MessageConsumer that
+         * is registered with this class.
+         * 
+         * It is a error for a MessageListener to throw an exception in their
+         * onMessage method, but if it does happen this function will get any
+         * registered exception listener from the session and notify it.
+         */            
+        virtual void run(void);
+
+    public:  // ActiveMQMessageListener Methods
+    
+        /**
+         * Called asynchronously when a new message is received, the message
+         * that is passed is now the property of the callee, and the caller
+         * will disavowe all knowledge of the message, i.e Callee must delete.
+         * @param Message object pointer
+         */
+        virtual void onActiveMQMessage( ActiveMQMessage* message ) 
+            throw ( exceptions::ActiveMQException );
+    
+    public:  // ActiveMQSessionResource
+    
+        /**
+         * Retrieve the Connector resource that is associated with
+         * this Session resource.
+         * @return pointer to a Connector Resource, can be NULL
+         */
+        virtual connector::ConnectorResource* getConnectorResource(void) {
+            return consumerInfo;
+        }
+
+    public:  // ActiveMQConsumer Methods
+    
+        /**
+         * Called to dispatch a message to this consumer, this is usually
+         * called from the context of another thread.  This will enqueue a
+         * message on the Consumers Queue, or notify a listener if one is
+         * currently registered.
+         * @param cms::Message pointer to the message to dispatch
+         * @throw cms::CMSException
+         */
+        virtual void dispatch(ActiveMQMessage* message) 
+            throw ( cms::CMSException );
+
+        /**
+         * Get the Consumer information for this consumer
+         * @return Pointer to a Consumer Info Object            
+         */
+        virtual connector::ConsumerInfo* getConsumerInfo(void) {
+            return consumerInfo;
+        }
+
+    protected:
+            
+        /**
+         * Purges all messages currently in the queue.  This can be as a
+         * result of a rollback, or of the consumer being shutdown.
+         */
+        virtual void purgeMessages(void);
+        
+        /**
+         * Destroys the message if the session is transacted, otherwise
+         * does nothing.
+         */
+        virtual void destroyMessage( cms::Message* message );
+
+        /**
+         * Notifies the listener of a message.
+         */
+        void notifyListener( cms::Message* message );
+        
+        /**
+         * Starts the message processing thread to receive messages
+         * asynchronously.  This thread is started when setMessageListener
+         * is invoked, which means that the caller is choosing to use this
+         * consumer asynchronously instead of synchronously (receive).
+         */
+        void startThread();
+        
+        /**
+         * Stops the asynchronous message processing thread if it's started.
+         */
+        void stopThread();
+    };
+
+}}
+
+#endif /*_ACTIVEMQ_CORE_ACTIVEMQCONSUMER_H_*/

Added: incubator/activemq/trunk/activemq-cpp/src/main/activemq/core/ActiveMQMessage.h
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/core/ActiveMQMessage.h?rev=418749&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/core/ActiveMQMessage.h (added)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/core/ActiveMQMessage.h Mon Jul  3 04:51:36 2006
@@ -0,0 +1,68 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef _ACTIVEMQ_CORE_ACTIVEMQMESSAGE_H_
+#define _ACTIVEMQ_CORE_ACTIVEMQMESSAGE_H_
+
+#include <cms/Message.h>
+
+namespace activemq{
+namespace core{
+
+    class ActiveMQAckHandler;
+
+    /**
+     * Interface for all ActiveMQ Messages that will pass through the core
+     * API layer.  This interface defines a method that the API uses to set
+     * an Acknowledgement handler that will be called by the message when
+     * a user calls the <code>acknowledge</code> method of the Message 
+     * interface.  This is only done when the Session that this message
+     * passes through is in Client Acknowledge mode.
+     */
+    class ActiveMQMessage
+    {
+    public:
+
+        /**
+         * Destructor
+         */
+    	virtual ~ActiveMQMessage(void) {}
+
+        /**
+         * Sets the Acknowledgement Handler that this Message will use
+         * when the Acknowledge method is called.
+         * @param ActiveMQAckHandler
+         */
+        virtual void setAckHandler(ActiveMQAckHandler* handler) = 0;
+        
+        /**
+         * Gets the number of times this message has been redelivered.
+         * @return redelivery count
+         */
+        virtual int getRedeliveryCount(void) const = 0;
+        
+        /**
+         * Sets the count of the number of times this message has been 
+         * redelivered
+         * @param redelivery count
+         */
+        virtual void setRedeliveryCount(int count) = 0;
+
+    };
+
+}}
+
+#endif /*_ACTIVEMQ_CORE_ACTIVEMQMESSAGE_H_*/

Added: incubator/activemq/trunk/activemq-cpp/src/main/activemq/core/ActiveMQMessageListener.h
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/core/ActiveMQMessageListener.h?rev=418749&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/core/ActiveMQMessageListener.h (added)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/core/ActiveMQMessageListener.h Mon Jul  3 04:51:36 2006
@@ -0,0 +1,47 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _ACTIVEMQ_CORE_ACTIVEMQMESSAGELISTENER_H_
+#define _ACTIVEMQ_CORE_ACTIVEMQMESSAGELISTENER_H_
+
+#include <activemq/exceptions/ActiveMQException.h>
+
+namespace activemq{
+namespace core{
+
+    class ActiveMQMessage;
+
+    class ActiveMQMessageListener
+    {
+    public:
+
+    	virtual ~ActiveMQMessageListener(void) {}
+
+        /**
+         * Called asynchronously when a new message is received, the message
+         * that is passed is now the property of the callee, and the caller
+         * will disavowe all knowledge of the message, i.e Callee must delete.
+         * @param Message object pointer
+         */
+        virtual void onActiveMQMessage( ActiveMQMessage* message ) 
+            throw ( exceptions::ActiveMQException ) = 0;
+
+    };
+
+}}
+
+#endif /*_ACTIVEMQ_CORE_ACTIVEMQMESSAGELISTENER_H_*/

Added: incubator/activemq/trunk/activemq-cpp/src/main/activemq/core/ActiveMQProducer.cpp
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/core/ActiveMQProducer.cpp?rev=418749&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/core/ActiveMQProducer.cpp (added)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/core/ActiveMQProducer.cpp Mon Jul  3 04:51:36 2006
@@ -0,0 +1,91 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "ActiveMQProducer.h"
+
+#include <activemq/core/ActiveMQSession.h>
+#include <activemq/exceptions/NullPointerException.h>
+
+using namespace std;
+using namespace activemq;
+using namespace activemq::core;
+using namespace activemq::connector;
+using namespace activemq::exceptions;
+
+////////////////////////////////////////////////////////////////////////////////
+ActiveMQProducer::ActiveMQProducer(connector::ProducerInfo* producerInfo,
+                                   ActiveMQSession* session)
+{
+    if(session == NULL || producerInfo == NULL)
+    {
+        throw NullPointerException(
+            __FILE__, __LINE__,
+            "ActiveMQProducer::ActiveMQProducer - Init with NULL Session");
+    }
+    
+    // Init Producer Data
+    this->session      = session;
+    this->producerInfo = producerInfo;
+
+    // Default the Delivery options
+    deliveryMode      = cms::Message::PERSISTANT;
+    disableMsgId      = false;
+    disableTimestamps = false;
+    priority          = 4;
+    timeToLive        = 0;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+ActiveMQProducer::~ActiveMQProducer(void)
+{
+    try
+    {
+        // Dispose of the ProducerInfo
+        session->onDestroySessionResource(this);
+    }
+    AMQ_CATCH_NOTHROW( ActiveMQException )
+    AMQ_CATCHALL_NOTHROW( )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQProducer::send(cms::Message& message) 
+    throw ( cms::CMSException )
+{
+    try
+    {
+        send(producerInfo->getDestination(), message);
+    }
+    AMQ_CATCH_RETHROW( ActiveMQException )
+    AMQ_CATCHALL_THROW( ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQProducer::send(const cms::Destination& destination,
+                            cms::Message& message) throw ( cms::CMSException )
+{
+    try
+    {
+        // configure the message
+        message.setCMSDestination(destination);
+        message.setCMSDeliveryMode(deliveryMode);
+        message.setCMSPriority(priority);
+        message.setCMSExpiration(timeToLive);
+
+        session->send(&message, this);
+    }
+    AMQ_CATCH_RETHROW( ActiveMQException )
+    AMQ_CATCHALL_THROW( ActiveMQException )
+}

Added: incubator/activemq/trunk/activemq-cpp/src/main/activemq/core/ActiveMQProducer.h
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/core/ActiveMQProducer.h?rev=418749&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/core/ActiveMQProducer.h (added)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/core/ActiveMQProducer.h Mon Jul  3 04:51:36 2006
@@ -0,0 +1,191 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef _ACTIVEMQ_CORE_ACTIVEMQPRODUCER_H_
+#define _ACTIVEMQ_CORE_ACTIVEMQPRODUCER_H_
+
+#include <cms/MessageProducer.h>
+#include <cms/Message.h>
+#include <cms/Destination.h>
+
+#include <activemq/core/ActiveMQSessionResource.h>
+#include <activemq/connector/ProducerInfo.h>
+
+namespace activemq{
+namespace core{
+
+    class ActiveMQSession;
+
+    class ActiveMQProducer : public cms::MessageProducer,
+                             public ActiveMQSessionResource
+    {
+    private:
+   
+        // Delivery Mode of this Producer
+        cms::Message::DeliveryMode deliveryMode;
+      
+        // Disable the Message Id
+        bool disableMsgId;
+      
+        // Disable sending timestamps
+        bool disableTimestamps;
+      
+        // Priority Level to send at
+        int priority;
+
+        // Time to live setting for message
+        int timeToLive;
+      
+        // Session that this producer sends to.
+        ActiveMQSession* session;
+      
+        // This Producers protocal specific info object
+        connector::ProducerInfo* producerInfo;
+      
+    public:
+
+        /**
+         * Constructor
+         */
+        ActiveMQProducer( connector::ProducerInfo* producerInfo,
+                          ActiveMQSession* session );
+
+        /**
+         * Destructor
+         */
+        virtual ~ActiveMQProducer(void);
+
+        /**
+         * Sends the message to the default producer destination.
+         * @param a Message Object Pointer
+         * @throws CMSException
+         */
+        virtual void send( cms::Message& message ) throw ( cms::CMSException );
+      
+        /**
+         * Sends the message to the designated destination.
+         * @param a Message Object Pointer
+         * @throws CMSException
+         */
+        virtual void send( const cms::Destination& destination,
+                           cms::Message& message) throw ( cms::CMSException );
+
+        /** 
+         * Sets the delivery mode for this Producer
+         * @param The DeliveryMode
+         */
+        virtual void setDeliveryMode(cms::Message::DeliveryMode mode) {
+            deliveryMode = mode; 
+        }
+      
+        /** 
+         * Gets the delivery mode for this Producer
+         * @return The DeliveryMode
+         */
+        virtual cms::Message::DeliveryMode getDeliveryMode(void) const {
+            return deliveryMode; 
+        }
+      
+        /**
+         * Sets if Message Ids are disbled for this Producer
+         * @param boolean indicating enable / disable (true / false)
+         */
+        virtual void setDisableMessageId( bool value ) {
+            disableMsgId = value; 
+        }
+      
+        /**
+         * Sets if Message Ids are disbled for this Producer
+         * @param boolean indicating enable / disable (true / false)
+         */
+        virtual bool getDisableMessageId(void) const {
+            return disableMsgId;
+        }
+
+        /**
+         * Sets if Message Time Stamps are disbled for this Producer
+         * @param boolean indicating enable / disable (true / false)
+         */
+        virtual void setDisableMessageTimeStamp( bool value ) {
+            disableTimestamps = value;
+        }
+      
+        /**
+         * Sets if Message Time Stamps are disbled for this Producer
+         * @param boolean indicating enable / disable (true / false)
+         */
+        virtual bool getDisableMessageTimeStamp(void) const {
+            return disableTimestamps;
+        }
+      
+        /**
+         * Sets the Priority that this Producers sends messages at
+         * @param int value for Priority level
+         */
+        virtual void setPriority( int priority ) {
+            this->priority = priority; 
+        }
+      
+        /**
+         * Gets the Priority level that this producer sends messages at
+         * @return int based priority level
+         */
+        virtual int getPriority(void) const {
+            return priority;
+        }
+      
+        /**
+         * Sets the Time to Live that this Producers sends messages with
+         * @param int value for time to live
+         */
+        virtual void setTimeToLive( int time ) {
+            timeToLive = time;
+        }
+  
+        /**
+         * Gets the Time to Live that this producer sends messages with
+         * @return int based Time to Live
+         */
+        virtual int getTimeToLive(void) const {
+            return timeToLive;
+        }
+      
+    public:  // ActiveMQSessionResource
+    
+        /**
+         * Retrieve the Connector resource that is associated with
+         * this Session resource.
+         * @return pointer to a Connector Resource, can be NULL
+         */
+        virtual connector::ConnectorResource* getConnectorResource(void) {
+            return producerInfo;
+        }
+
+    public:
+   
+        /**
+         * Retrives this object ProducerInfo pointer
+         * @return ProducerInfo pointer
+         */
+        virtual connector::ProducerInfo* getProducerInfo(void){
+            return producerInfo;
+        }
+
+   };
+
+}}
+
+#endif /*_ACTIVEMQ_CORE_ACTIVEMQPRODUCER_H_*/

Added: incubator/activemq/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp?rev=418749&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp (added)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.cpp Mon Jul  3 04:51:36 2006
@@ -0,0 +1,545 @@
+/*
+* Copyright 2006 The Apache Software Foundation or its licensors, as
+* applicable.
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+*     http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+#include "ActiveMQSession.h"
+
+#include <activemq/exceptions/InvalidStateException.h>
+#include <activemq/exceptions/NullPointerException.h>
+
+#include <activemq/core/ActiveMQConnection.h>
+#include <activemq/core/ActiveMQTransaction.h>
+#include <activemq/core/ActiveMQConsumer.h>
+#include <activemq/core/ActiveMQMessage.h>
+#include <activemq/core/ActiveMQProducer.h>
+
+#include <activemq/connector/TransactionInfo.h>
+
+using namespace std;
+using namespace cms;
+using namespace activemq;
+using namespace activemq::core;
+using namespace activemq::util;
+using namespace activemq::connector;
+using namespace activemq::exceptions;
+
+////////////////////////////////////////////////////////////////////////////////
+ActiveMQSession::ActiveMQSession( SessionInfo* sessionInfo,
+                                  const Properties& properties,
+                                  ActiveMQConnection* connection)
+{
+    if(sessionInfo == NULL || connection == NULL)
+    {
+        throw NullPointerException(
+            __FILE__, __LINE__,
+            "ActiveMQSession::ActiveMQSession - Init with NULL data");
+    }
+
+    this->sessionInfo = sessionInfo;
+    this->transaction = NULL;
+    this->connection  = connection;
+    this->closed      = false;
+
+    // Create a Transaction object only if the session is transactional
+    if(isTransacted())
+    {
+        transaction = 
+            new ActiveMQTransaction(connection, this, properties );
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+ActiveMQSession::~ActiveMQSession(void)
+{
+    try
+    {
+        // Destroy this session's resources
+        close();
+    }
+    AMQ_CATCH_NOTHROW( ActiveMQException )
+    AMQ_CATCHALL_NOTHROW( )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQSession::close(void) throw ( cms::CMSException )
+{
+    if(closed)
+    {
+        return;
+    }
+
+    try
+    {
+        // Destry the Transaction
+        delete transaction;
+
+        // Destroy this sessions resources
+        connection->getConnectionData()->
+            getConnector()->destroyResource(sessionInfo);
+
+        // mark as done
+        closed = true;
+    }
+    AMQ_CATCH_NOTHROW( ActiveMQException )
+    AMQ_CATCHALL_NOTHROW( )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQSession::commit(void) throw ( cms::CMSException )
+{
+    try
+    {
+        if(closed || !isTransacted())
+        {
+            throw InvalidStateException(
+                __FILE__, __LINE__,
+                "ActiveMQSession::commit - This Session Can't Commit");
+        }
+
+        // Commit the Transaction
+        transaction->commit();
+    }
+    AMQ_CATCH_NOTHROW( ActiveMQException )
+    AMQ_CATCHALL_NOTHROW( )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQSession::rollback(void) throw ( cms::CMSException )
+{
+    try
+    {
+        if(closed || !isTransacted())
+        {
+            throw InvalidStateException(
+                __FILE__, __LINE__,
+                "ActiveMQSession::rollback - This Session Can't Rollback");
+        }
+
+        // Rollback the Transaction
+        transaction->rollback();
+    }
+    AMQ_CATCH_NOTHROW( ActiveMQException )
+    AMQ_CATCHALL_NOTHROW( )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+cms::MessageConsumer* ActiveMQSession::createConsumer(
+    cms::Destination& destination)
+        throw ( cms::CMSException )
+{
+    try
+    {
+        if(closed)
+        {
+            throw InvalidStateException(
+                __FILE__, __LINE__,
+                "ActiveMQSession::createConsumer - Session Already Closed");
+        }
+
+        return createConsumer(destination, "");
+    }
+    AMQ_CATCH_RETHROW( ActiveMQException )
+    AMQ_CATCHALL_THROW( ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+cms::MessageConsumer* ActiveMQSession::createConsumer(
+    cms::Destination& destination,
+    const std::string& selector)
+        throw ( cms::CMSException )
+{
+    try
+    {
+        if(closed)
+        {
+            throw InvalidStateException(
+                __FILE__, __LINE__,
+                "ActiveMQSession::createConsumer - Session Already Closed");
+        }
+
+        ActiveMQConsumer* consumer = new ActiveMQConsumer(
+            connection->getConnectionData()->getConnector()->
+                createConsumer(&destination, sessionInfo, selector), this);
+
+        connection->addMessageListener(
+            consumer->getConsumerInfo()->getConsumerId(), consumer );
+
+        return consumer;
+    }
+    AMQ_CATCH_RETHROW( ActiveMQException )
+    AMQ_CATCHALL_THROW( ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+cms::MessageConsumer* ActiveMQSession::createDurableConsumer(
+    cms::Topic& destination,
+    const std::string& name,
+    const std::string& selector,
+    bool noLocal )
+        throw ( cms::CMSException )
+{
+    try
+    {
+        if(closed)
+        {
+            throw InvalidStateException(
+                __FILE__, __LINE__,
+                "ActiveMQSession::createProducer - Session Already Closed");
+        }
+
+        ActiveMQConsumer* consumer = new ActiveMQConsumer(
+            connection->getConnectionData()->getConnector()->
+                createDurableConsumer( &destination, sessionInfo, name, selector, noLocal ), this);
+
+        connection->addMessageListener(
+            consumer->getConsumerInfo()->getConsumerId(), consumer );
+
+        return consumer;
+    }
+    AMQ_CATCH_RETHROW( ActiveMQException )
+    AMQ_CATCHALL_THROW( ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+cms::MessageProducer* ActiveMQSession::createProducer(
+    cms::Destination& destination)
+        throw ( cms::CMSException )
+{
+    try
+    {
+        if(closed)
+        {
+            throw InvalidStateException(
+                __FILE__, __LINE__,
+                "ActiveMQSession::createProducer - Session Already Closed");
+        }
+
+        return new ActiveMQProducer(
+            connection->getConnectionData()->getConnector()->
+                createProducer(&destination, sessionInfo), this);
+    }
+    AMQ_CATCH_RETHROW( ActiveMQException )
+    AMQ_CATCHALL_THROW( ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+cms::Queue* ActiveMQSession::createQueue(const std::string& queueName)
+    throw ( cms::CMSException )
+{
+    try
+    {
+        if(closed)
+        {
+            throw InvalidStateException(
+                __FILE__, __LINE__,
+                "ActiveMQSession::createQueue - Session Already Closed");
+        }
+
+        return connection->getConnectionData()->
+            getConnector()->createQueue(queueName, sessionInfo);
+    }
+    AMQ_CATCH_RETHROW( ActiveMQException )
+    AMQ_CATCHALL_THROW( ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+cms::Topic* ActiveMQSession::createTopic(const std::string& topicName)
+    throw ( cms::CMSException )
+{
+    try
+    {
+        if(closed)
+        {
+            throw InvalidStateException(
+                __FILE__, __LINE__,
+                "ActiveMQSession::createTopic - Session Already Closed");
+        }
+
+        return connection->getConnectionData()->
+            getConnector()->createTopic(topicName, sessionInfo);
+    }
+    AMQ_CATCH_RETHROW( ActiveMQException )
+    AMQ_CATCHALL_THROW( ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+cms::TemporaryQueue* ActiveMQSession::createTemporaryQueue(void)
+    throw ( cms::CMSException )
+{
+    try
+    {
+        if(closed)
+        {
+            throw InvalidStateException(
+                __FILE__, __LINE__,
+                "ActiveMQSession::createTemporaryQueue - Session Already Closed");
+        }
+
+        return connection->getConnectionData()->
+            getConnector()->createTemporaryQueue(sessionInfo);
+    }
+    AMQ_CATCH_RETHROW( ActiveMQException )
+    AMQ_CATCHALL_THROW( ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+cms::TemporaryTopic* ActiveMQSession::createTemporaryTopic(void)
+    throw ( cms::CMSException )
+{
+    try
+    {
+        if(closed)
+        {
+            throw InvalidStateException(
+                __FILE__, __LINE__,
+                "ActiveMQSession::createTemporaryTopic - Session Already Closed");
+        }
+
+        return connection->getConnectionData()->
+            getConnector()->createTemporaryTopic(sessionInfo);
+    }
+    AMQ_CATCH_RETHROW( ActiveMQException )
+    AMQ_CATCHALL_THROW( ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+cms::Message* ActiveMQSession::createMessage(void) 
+    throw ( cms::CMSException )
+{
+    try
+    {
+        if(closed)
+        {
+            throw InvalidStateException(
+                __FILE__, __LINE__,
+                "ActiveMQSession::createMessage - Session Already Closed");
+        }
+
+        return connection->getConnectionData()->
+            getConnector()->createMessage( sessionInfo, transaction );
+    }
+    AMQ_CATCH_RETHROW( ActiveMQException )
+    AMQ_CATCHALL_THROW( ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+cms::BytesMessage* ActiveMQSession::createBytesMessage(void) 
+    throw ( cms::CMSException)
+{
+    try
+    {
+        if(closed)
+        {
+            throw InvalidStateException(
+                __FILE__, __LINE__,
+                "ActiveMQSession::createBytesMessage - Session Already Closed");
+        }
+
+        return connection->getConnectionData()->
+            getConnector()->createBytesMessage( sessionInfo, transaction );
+    }
+    AMQ_CATCH_RETHROW( ActiveMQException )
+    AMQ_CATCHALL_THROW( ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+cms::BytesMessage* ActiveMQSession::createBytesMessage(
+    const unsigned char* bytes,
+    unsigned long bytesSize) 
+        throw ( cms::CMSException)
+{
+    try
+    {
+        BytesMessage* msg = createBytesMessage();
+
+        msg->setBodyBytes(bytes, bytesSize);
+
+        return msg;
+    }
+    AMQ_CATCH_RETHROW( ActiveMQException )
+    AMQ_CATCHALL_THROW( ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+cms::TextMessage* ActiveMQSession::createTextMessage(void) 
+    throw ( cms::CMSException )
+{
+    try
+    {
+        if(closed)
+        {
+            throw InvalidStateException(
+                __FILE__, __LINE__,
+                "ActiveMQSession::createTextMessage - Session Already Closed");
+        }
+
+        return connection->getConnectionData()->
+            getConnector()->createTextMessage( sessionInfo, transaction );
+    }
+    AMQ_CATCH_RETHROW( ActiveMQException )
+    AMQ_CATCHALL_THROW( ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+cms::TextMessage* ActiveMQSession::createTextMessage(const std::string& text) 
+    throw ( cms::CMSException )
+{
+    try
+    {
+        TextMessage* msg = createTextMessage();
+
+        msg->setText(text.c_str());
+
+        return msg;
+    }
+    AMQ_CATCH_RETHROW( ActiveMQException )
+    AMQ_CATCHALL_THROW( ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+cms::MapMessage* ActiveMQSession::createMapMessage(void) 
+    throw ( cms::CMSException )
+{
+    try
+    {
+        if(closed)
+        {
+            throw InvalidStateException(
+                __FILE__, __LINE__,
+                "ActiveMQSession::createMapMessage - Session Already Closed");
+        }
+
+        return connection->
+            getConnectionData()->
+                getConnector()->createMapMessage( sessionInfo, transaction );
+    }
+    AMQ_CATCH_RETHROW( ActiveMQException )
+    AMQ_CATCHALL_THROW( ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+cms::Session::AcknowledgeMode ActiveMQSession::getAcknowledgeMode(void) const
+{
+    return sessionInfo != NULL ? 
+        sessionInfo->getAckMode() : Session::AutoAcknowledge;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool ActiveMQSession::isTransacted(void) const
+{
+    return sessionInfo != NULL ? 
+        sessionInfo->getAckMode() == Session::Transactional : false;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQSession::acknowledge(ActiveMQConsumer* consumer,
+                                  ActiveMQMessage* message)
+    throw ( cms::CMSException )
+{
+    try
+    {
+        if( closed )
+        {
+            throw InvalidStateException(
+                __FILE__, __LINE__,
+                "ActiveMQSession::acknowledgeMessage - Session Already Closed");
+        }
+
+        // Stores the Message and its consumer in the tranasction, if the
+        // session is a transactional one.
+        if(isTransacted())
+        {      
+            transaction->addToTransaction( message, consumer );
+        }
+
+        // Delegate to connector to ack this message.
+        return connection->getConnectionData()->
+            getConnector()->acknowledge( 
+                sessionInfo, dynamic_cast< cms::Message* >( message ) );
+    }
+    AMQ_CATCH_RETHROW( ActiveMQException )
+    AMQ_CATCHALL_THROW( ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQSession::send(cms::Message* message, ActiveMQProducer* producer)
+    throw ( cms::CMSException )
+{
+    try
+    {
+        if(closed)
+        {
+            throw InvalidStateException(
+                __FILE__, __LINE__,
+                "ActiveMQSession::onProducerClose - Session Already Closed");
+        }
+
+        // Send via the connection
+        connection->getConnectionData()->
+            getConnector()->send( message, producer->getProducerInfo() );
+    }
+    AMQ_CATCH_RETHROW( ActiveMQException )
+    AMQ_CATCHALL_THROW( ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQSession::onDestroySessionResource( 
+    ActiveMQSessionResource* resource )
+        throw ( cms::CMSException )
+{
+    try
+    {
+        if(closed)
+        {
+            throw InvalidStateException(
+                __FILE__, __LINE__,
+                "ActiveMQSession::onProducerClose - Session Already Closed");
+        }
+
+        ActiveMQConsumer* consumer = 
+            dynamic_cast< ActiveMQConsumer*>( resource );
+
+        if( consumer != NULL )
+        {
+            // Remove this Consumer from the Connection
+            connection->removeMessageListener(
+                consumer->getConsumerInfo()->getConsumerId());
+
+            // Remove this consumer from the Transaction if we are
+            // transactional
+            if( transaction != NULL )
+            {
+                transaction->removeFromTransaction(consumer);
+            }
+        }
+
+        // Free its resources.
+        connection->getConnectionData()->
+            getConnector()->destroyResource( resource->getConnectorResource() );
+    }
+    AMQ_CATCH_RETHROW( ActiveMQException )
+    AMQ_CATCHALL_THROW( ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+cms::ExceptionListener* ActiveMQSession::getExceptionListener(void)
+{
+    if(connection != NULL)
+    {
+        return connection->getExceptionListener();
+    }
+
+    return NULL;
+}

Added: incubator/activemq/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.h
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.h?rev=418749&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.h (added)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.h Mon Jul  3 04:51:36 2006
@@ -0,0 +1,271 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef _ACTIVEMQ_CORE_ACTIVEMQSESSION_H_
+#define _ACTIVEMQ_CORE_ACTIVEMQSESSION_H_
+
+#include <cms/Session.h>
+#include <cms/ExceptionListener.h>
+#include <activemq/connector/SessionInfo.h>
+#include <activemq/core/ActiveMQSessionResource.h>
+
+namespace activemq{
+namespace core{
+
+   class ActiveMQTransaction;
+   class ActiveMQConnection;
+   class ActiveMQConsumer;
+   class ActiveMQMessage;
+   class ActiveMQProducer;
+   class ActiveMQConsumer;
+   
+   class ActiveMQSession : public cms::Session
+   {
+   private:
+   
+      // SessionInfo for this Session
+      connector::SessionInfo* sessionInfo;
+      
+      // Transaction Management object
+      ActiveMQTransaction* transaction;
+      
+      // Connection
+      ActiveMQConnection* connection;
+      
+      // Bool to indicate if this session was closed.
+      bool closed;
+      
+   public:
+   
+      /**
+       * Constructor
+       */
+      ActiveMQSession( connector::SessionInfo* sessionInfo,
+                       const util::Properties& properties,
+                       ActiveMQConnection* connection);
+   
+      /**
+       * Destructor
+       */
+      virtual ~ActiveMQSession(void);
+   
+   public:   // Implements Mehtods
+   
+      /**
+       * Closes the Session
+       * @throw CMSException
+       */
+      virtual void close(void) throw ( cms::CMSException );
+      
+      /**
+       * Commits all messages done in this transaction and releases any 
+       * locks currently held.
+       * @throws CMSException
+       */
+      virtual void commit(void) throw ( cms::CMSException );
+
+      /**
+       * Rollsback all messages done in this transaction and releases any 
+       * locks currently held.
+       * @throws CMSException
+       */
+      virtual void rollback(void) throw ( cms::CMSException );
+
+      /**
+       * Creates a MessageConsumer for the specified destination.
+       * @param the Destination that this consumer receiving messages for.
+       * @throws CMSException
+       */
+      virtual cms::MessageConsumer* createConsumer(cms::Destination& destination)
+         throw ( cms::CMSException );
+
+      /**
+       * Creates a MessageConsumer for the specified destination, using a 
+       * message selector.
+       * @param the Destination that this consumer receiving messages for.
+       * @throws CMSException
+       */
+      virtual cms::MessageConsumer* createConsumer(cms::Destination& destination,
+                                                   const std::string& selector)
+         throw ( cms::CMSException );
+         
+      /**
+       * Creates a durable subscriber to the specified topic, using a 
+       * message selector
+       * @param the topic to subscribe to
+       * @param name used to identify the subscription
+       * @param only messages matching the selector are received
+       * @throws CMSException
+       */
+      virtual cms::MessageConsumer* createDurableConsumer(
+         cms::Topic& destination,
+         const std::string& name,
+         const std::string& selector,
+         bool noLocal = false)
+            throw ( cms::CMSException );
+
+      /**
+       * Creates a MessageProducer to send messages to the specified 
+       * destination.
+       * @param the Destination to publish on
+       * @throws CMSException
+       */
+      virtual cms::MessageProducer* createProducer(cms::Destination& destination)
+         throw ( cms::CMSException );
+         
+      /**
+       * Creates a queue identity given a Queue name.
+       * @param the name of the new Queue
+       * @throws CMSException
+       */
+      virtual cms::Queue* createQueue(const std::string& queueName)
+         throw ( cms::CMSException );
+      
+      /**
+       * Creates a topic identity given a Queue name.
+       * @param the name of the new Topic
+       * @throws CMSException
+       */
+      virtual cms::Topic* createTopic(const std::string& topicName)
+         throw ( cms::CMSException );
+
+      /**
+       * Creates a TemporaryQueue object.
+       * @throws CMSException
+       */
+      virtual cms::TemporaryQueue* createTemporaryQueue(void)
+         throw ( cms::CMSException );
+
+      /**
+       * Creates a TemporaryTopic object.
+       * @throws CMSException
+       */
+      virtual cms::TemporaryTopic* createTemporaryTopic(void)
+         throw ( cms::CMSException );
+         
+      /**
+       * Creates a new Message
+       * @throws CMSException
+       */
+      virtual cms::Message* createMessage(void) 
+         throw ( cms::CMSException );
+
+      /**
+       * Creates a BytesMessage
+       * @throws CMSException
+       */
+      virtual cms::BytesMessage* createBytesMessage(void) 
+         throw ( cms::CMSException);
+
+      /**
+       * Creates a BytesMessage and sets the paylod to the passed value
+       * @param an array of bytes to set in the message
+       * @param the size of the bytes array, or number of bytes to use
+       * @throws CMSException
+       */
+      virtual cms::BytesMessage* createBytesMessage(const unsigned char* bytes,
+                                                    unsigned long bytesSize) 
+         throw ( cms::CMSException);
+
+      /**
+       * Creates a new TextMessage
+       * @throws CMSException
+       */
+      virtual cms::TextMessage* createTextMessage(void) 
+         throw ( cms::CMSException );
+      
+      /**
+       * Creates a new TextMessage and set the text to the value given
+       * @param the initial text for the message
+       * @throws CMSException
+       */
+      virtual cms::TextMessage* createTextMessage(const std::string& text) 
+         throw ( cms::CMSException );
+
+      /**
+       * Creates a new TextMessage
+       * @throws CMSException
+       */
+      virtual cms::MapMessage* createMapMessage(void) 
+         throw ( cms::CMSException );
+
+      /**
+       * Returns the acknowledgement mode of the session.
+       * @return the Sessions Acknowledge Mode
+       */
+      virtual cms::Session::AcknowledgeMode getAcknowledgeMode(void) const;
+      
+      /**
+       * Gets if the Sessions is a Transacted Session
+       * @return transacted true - false.
+       */
+      virtual bool isTransacted(void) const;
+          
+   public:   // ActiveMQSession specific Methods
+   
+      /**
+       * Sends a message from the Producer specified
+       * @param cms::Message pointer
+       * @param Producer Information
+       * @throws CMSException
+       */
+      virtual void send(cms::Message* message, ActiveMQProducer* producer)
+         throw ( cms::CMSException );
+         
+      /**
+       * When a ActiveMQ core object is closed or destroyed it should call 
+       * back and let the session know that it is going away, this allows 
+       * the session to clean up any associated resources.  This method 
+       * destroy's the data that is associated with a Producer object
+       * @param The Producer that is being destoryed
+       * @throw CMSException
+       */
+      virtual void onDestroySessionResource( ActiveMQSessionResource* resource )
+         throw ( cms::CMSException );
+
+      /**
+       * Called to acknowledge the receipt of a message.  
+       * @param The consumer that received the message
+       * @param The Message to acknowledge.
+       * @throws CMSException
+       */
+      virtual void acknowledge(ActiveMQConsumer* consumer,
+                               ActiveMQMessage* message)
+         throw ( cms::CMSException);
+         
+      /**
+       * This method gets any registered exception listener of this sessions
+       * connection and returns it.  Mainly intended for use by the objects
+       * that this session creates so that they can notify the client of
+       * exceptions that occur in the context of another thread.
+       * @returns cms::ExceptionListener pointer or NULL
+       */
+      virtual cms::ExceptionListener* getExceptionListener(void);
+
+      /**
+       * Gets the Session Information object for this session, if the
+       * session is closed than this returns null
+       * @return SessionInfo Pointer
+       */
+      virtual connector::SessionInfo* getSessionInfo(void) {
+         return sessionInfo;
+      }
+      
+   };
+
+}}
+
+#endif /*_ACTIVEMQ_CORE_ACTIVEMQSESSION_H_*/

Added: incubator/activemq/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSessionResource.h
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSessionResource.h?rev=418749&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSessionResource.h (added)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSessionResource.h Mon Jul  3 04:51:36 2006
@@ -0,0 +1,42 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef _ACTIVEMQ_CORE_ACTIVEMQSESSIONRESOURCE_H_
+#define _ACTIVEMQ_CORE_ACTIVEMQSESSIONRESOURCE_H_
+
+#include <activemq/connector/ConnectorResource.h>
+
+namespace activemq{
+namespace core{
+
+    class ActiveMQSessionResource
+    {
+    public:
+    
+    	virtual ~ActiveMQSessionResource(void) {}
+    
+        /**
+         * Retrieve the Connector resource that is associated with
+         * this Session resource.
+         * @return pointer to a Connector Resource, can be NULL
+         */
+        virtual connector::ConnectorResource* getConnectorResource(void) = 0;
+
+    };
+
+}}
+
+#endif /*_ACTIVEMQ_CORE_ACTIVEMQSESSIONRESOURCE_H_*/



Mime
View raw message