activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r1307147 [3/3] - in /activemq/activemq-cpp/trunk/activemq-cpp/src/main: ./ activemq/core/ activemq/core/kernels/ cms/
Date Thu, 29 Mar 2012 22:27:33 GMT
Added: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.cpp?rev=1307147&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.cpp (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.cpp Thu Mar 29 22:27:32 2012
@@ -0,0 +1,1212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "ActiveMQSessionKernel.h"
+
+#include <activemq/exceptions/ActiveMQException.h>
+#include <activemq/core/ActiveMQConstants.h>
+#include <activemq/core/ActiveMQConnection.h>
+#include <activemq/core/ActiveMQTransactionContext.h>
+#include <activemq/core/ActiveMQConsumer.h>
+#include <activemq/core/ActiveMQProducer.h>
+#include <activemq/core/ActiveMQQueueBrowser.h>
+#include <activemq/core/ActiveMQSessionExecutor.h>
+#include <activemq/core/PrefetchPolicy.h>
+#include <activemq/util/ActiveMQProperties.h>
+#include <activemq/util/CMSExceptionSupport.h>
+
+#include <activemq/commands/ConsumerInfo.h>
+#include <activemq/commands/DestinationInfo.h>
+#include <activemq/commands/ExceptionResponse.h>
+#include <activemq/commands/ActiveMQDestination.h>
+#include <activemq/commands/ActiveMQTopic.h>
+#include <activemq/commands/ActiveMQQueue.h>
+#include <activemq/commands/ActiveMQTempDestination.h>
+#include <activemq/commands/ActiveMQMessage.h>
+#include <activemq/commands/ActiveMQBytesMessage.h>
+#include <activemq/commands/ActiveMQTextMessage.h>
+#include <activemq/commands/ActiveMQMapMessage.h>
+#include <activemq/commands/ActiveMQStreamMessage.h>
+#include <activemq/commands/ActiveMQTempTopic.h>
+#include <activemq/commands/ActiveMQTempQueue.h>
+#include <activemq/commands/MessagePull.h>
+#include <activemq/commands/RemoveInfo.h>
+#include <activemq/commands/ProducerInfo.h>
+#include <activemq/commands/TransactionInfo.h>
+#include <activemq/commands/RemoveSubscriptionInfo.h>
+
+#include <decaf/lang/Boolean.h>
+#include <decaf/lang/Integer.h>
+#include <decaf/lang/Runnable.h>
+#include <decaf/lang/Long.h>
+#include <decaf/lang/Math.h>
+#include <decaf/util/Queue.h>
+#include <decaf/lang/exceptions/InvalidStateException.h>
+#include <decaf/lang/exceptions/NullPointerException.h>
+
+using namespace std;
+using namespace activemq;
+using namespace activemq::util;
+using namespace activemq::core;
+using namespace activemq::core::kernels;
+using namespace activemq::commands;
+using namespace activemq::exceptions;
+using namespace activemq::threads;
+using namespace decaf::util;
+using namespace decaf::util::concurrent;
+using namespace decaf::lang;
+using namespace decaf::lang::exceptions;
+
+////////////////////////////////////////////////////////////////////////////////
+namespace {
+
+    /**
+     * Class used to clear a Consumer's dispatch queue asynchronously from the
+     * connection class's Scheduler instance.
+     */
+    class ClearConsumerTask : public Runnable {
+    private:
+
+        Pointer<ActiveMQConsumerKernel> consumer;
+
+    private:
+
+        ClearConsumerTask(const ClearConsumerTask&);
+        ClearConsumerTask& operator=(const ClearConsumerTask&);
+
+    public:
+
+        ClearConsumerTask(Pointer<ActiveMQConsumerKernel> consumer) : Runnable(), consumer(consumer) {
+
+            if (consumer == NULL) {
+                throw NullPointerException(
+                    __FILE__, __LINE__, "Synchronization Created with NULL Consumer.");
+            }
+        }
+
+        virtual ~ClearConsumerTask() {}
+
+        virtual void run() {
+            this->consumer->clearMessagesInProgress();
+        }
+    };
+
+    /**
+     * Class used to Hook a session that has been closed into the Transaction
+     * it is currently a part of.  Once the Transaction has been Committed or
+     * Rolled back this Synchronization can finish the Close of the session.
+     */
+    class CloseSynhcronization : public Synchronization {
+    private:
+
+        ActiveMQSessionKernel* session;
+
+    private:
+
+        CloseSynhcronization(const CloseSynhcronization&);
+        CloseSynhcronization& operator=(const CloseSynhcronization&);
+
+    public:
+
+        CloseSynhcronization(ActiveMQSessionKernel* session) : Synchronization(), session(session) {
+
+            if(session == NULL) {
+                throw NullPointerException(
+                    __FILE__, __LINE__, "Synchronization Created with NULL Session.");
+            }
+        }
+
+        virtual ~CloseSynhcronization() {}
+
+        virtual void beforeEnd() {
+        }
+
+        virtual void afterCommit() {
+            session->doClose();
+        }
+
+        virtual void afterRollback() {
+            session->doClose();
+        }
+    };
+}
+
+////////////////////////////////////////////////////////////////////////////////
+namespace activemq{
+namespace core{
+namespace kernels{
+
+    class SessionConfig {
+    public:
+
+        typedef decaf::util::StlMap< Pointer<commands::ConsumerId>,
+                                     Pointer<ActiveMQConsumerKernel>,
+                                     commands::ConsumerId::COMPARATOR> ConsumersMap;
+
+    private:
+
+        SessionConfig(const SessionConfig&);
+        SessionConfig& operator=(const SessionConfig&);
+
+    public:
+
+        bool synchronizationRegistered;
+        decaf::util::concurrent::CopyOnWriteArrayList< Pointer<ActiveMQProducerKernel> > producers;
+        Pointer<Scheduler> scheduler;
+
+    public:
+
+        SessionConfig() : synchronizationRegistered(false), producers(), scheduler() {}
+        ~SessionConfig() {}
+    };
+
+}}}
+
+////////////////////////////////////////////////////////////////////////////////
+ActiveMQSessionKernel::ActiveMQSessionKernel(ActiveMQConnection* connection,
+                                             const Pointer<SessionId>& id,
+                                             cms::Session::AcknowledgeMode ackMode,
+                                             const Properties& properties) : config(new SessionConfig),
+                                                                             sessionInfo(),
+                                                                             transaction(),
+                                                                             connection(connection),
+                                                                             consumers(),
+                                                                             closed(false),
+                                                                             executor(),
+                                                                             ackMode(ackMode),
+                                                                             producerIds(),
+                                                                             producerSequenceIds(),
+                                                                             consumerIds(),
+                                                                             lastDeliveredSequenceId(0) {
+
+    if (id == NULL || connection == NULL) {
+        throw ActiveMQException(
+            __FILE__, __LINE__,
+            "ActiveMQSessionKernel::ActiveMQSessionKernel - Constructor called with NULL data");
+    }
+
+    this->sessionInfo.reset(new SessionInfo());
+    this->sessionInfo->setAckMode(ackMode);
+    this->sessionInfo->setSessionId(id);
+
+    this->connection->oneway(this->sessionInfo);
+
+    this->closed = false;
+    this->lastDeliveredSequenceId = -1;
+
+    // Create a Transaction objet
+    this->transaction.reset(new ActiveMQTransactionContext(this, properties));
+
+    // Create the session executor object.
+    this->executor.reset(new ActiveMQSessionExecutor(this));
+
+    // Use the Connection's Scheduler.
+    this->config->scheduler = this->connection->getScheduler();
+
+    // If the connection is already started, start the session.
+    if (this->connection->isStarted()) {
+        this->start();
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+ActiveMQSessionKernel::~ActiveMQSessionKernel() {
+    try{
+        // Destroy this session's resources
+        close();
+    }
+    AMQ_CATCH_NOTHROW( ActiveMQException )
+    AMQ_CATCHALL_NOTHROW( )
+
+    delete this->config;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQSessionKernel::fire(const activemq::exceptions::ActiveMQException& ex) {
+    if (connection != NULL) {
+        connection->fire(ex);
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQSessionKernel::close() {
+
+    // If we're already closed, just return.
+    if( this->closed.get() ) {
+        return;
+    }
+
+    if( this->transaction->isInXATransaction() ) {
+
+        // TODO - Right now we don't have a safe way of dealing with this case
+        // since the session might be deleted before the XA Transaction is finalized
+        // registering a Synchronization could result in an segmentation fault.
+        //
+        // For now we just close badly and throw an exception.
+        doClose();
+
+        throw UnsupportedOperationException(
+            __FILE__, __LINE__,
+            "The Consumer is still in an Active XA Transaction, commit it first." );
+    }
+
+    try {
+        doClose();
+    }
+    AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQSessionKernel::doClose() {
+
+    try {
+        dispose();
+
+        // Remove this session from the Broker.
+        Pointer<RemoveInfo> info(new RemoveInfo());
+        info->setObjectId(this->sessionInfo->getSessionId());
+        info->setLastDeliveredSequenceId(this->lastDeliveredSequenceId);
+        this->connection->oneway(info);
+    }
+    AMQ_CATCH_RETHROW( activemq::exceptions::ActiveMQException )
+    AMQ_CATCH_EXCEPTION_CONVERT( Exception, activemq::exceptions::ActiveMQException )
+    AMQ_CATCHALL_THROW( activemq::exceptions::ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQSessionKernel::dispose() {
+
+    class Finalizer {
+    private:
+
+        ActiveMQSessionKernel* session;
+        ActiveMQConnection* connection;
+
+    private:
+
+        Finalizer( const Finalizer& );
+        Finalizer& operator= ( const Finalizer& );
+
+    public:
+
+        Finalizer(ActiveMQSessionKernel* session, ActiveMQConnection* connection) :
+            session( session ), connection( connection ) {
+        }
+
+        ~Finalizer() {
+            Pointer<ActiveMQSessionKernel> session(this->session);
+            try {
+                this->connection->removeSession(session);
+            } catch(...) {
+                session.release();
+            }
+            this->session->closed = true;
+        }
+    };
+
+    try{
+
+        Finalizer final(this, this->connection);
+
+        // Stop the dispatch executor.
+        stop();
+
+        // Roll Back the transaction since we were closed without an explicit call
+        // to commit it.
+        if (this->transaction->isInTransaction()) {
+            this->transaction->rollback();
+        }
+
+        // Dispose of all Consumers, the dispose method skips the RemoveInfo command.
+        synchronized(&this->consumers) {
+
+            std::vector< Pointer<ActiveMQConsumerKernel> > closables = this->consumers.values();
+
+            for (std::size_t i = 0; i < closables.size(); ++i) {
+                try{
+                    closables[i]->setFailureError(this->connection->getFirstFailureError());
+                    closables[i]->dispose();
+                    this->lastDeliveredSequenceId =
+                        Math::max(this->lastDeliveredSequenceId, closables[i]->getLastDeliveredSequenceId());
+                } catch( cms::CMSException& ex ){
+                    /* Absorb */
+                }
+            }
+        }
+
+        // Dispose of all Producers, the dispose method skips the RemoveInfo command.
+        std::auto_ptr<Iterator<Pointer<ActiveMQProducerKernel> > > producerIter(this->config->producers.iterator());
+
+        while( producerIter->hasNext() ) {
+            try{
+                producerIter->next()->dispose();
+            } catch( cms::CMSException& ex ){
+                /* Absorb */
+            }
+        }
+    }
+    AMQ_CATCH_RETHROW( activemq::exceptions::ActiveMQException )
+    AMQ_CATCH_EXCEPTION_CONVERT( Exception, activemq::exceptions::ActiveMQException )
+    AMQ_CATCHALL_THROW( activemq::exceptions::ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQSessionKernel::commit() {
+
+    try {
+
+        this->checkClosed();
+
+        if( !this->isTransacted() ) {
+            throw ActiveMQException(
+                __FILE__, __LINE__,
+                "ActiveMQSessionKernel::commit - This Session is not Transacted");
+        }
+
+        // Commit the Transaction
+        this->transaction->commit();
+    }
+    AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQSessionKernel::rollback() {
+
+    try{
+
+        this->checkClosed();
+
+        if( !this->isTransacted() ) {
+            throw ActiveMQException(
+                __FILE__, __LINE__,
+                "ActiveMQSessionKernel::rollback - This Session is not Transacted" );
+        }
+
+        // Roll back the Transaction
+        this->transaction->rollback();
+    }
+    AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQSessionKernel::recover() {
+
+    try{
+
+        checkClosed();
+
+        if (isTransacted()) {
+            throw cms::IllegalStateException("This session is transacted");
+        }
+
+        synchronized( &this->consumers ) {
+            std::vector< Pointer<ActiveMQConsumerKernel> > consumers = this->consumers.values();
+
+            std::vector< Pointer<ActiveMQConsumerKernel> >::iterator iter = consumers.begin();
+            for( ; iter != consumers.end(); ++iter ) {
+                (*iter)->rollback();
+            }
+        }
+    }
+    AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQSessionKernel::clearMessagesInProgress() {
+
+    if( this->executor.get() != NULL ) {
+        this->executor->clearMessagesInProgress();
+    }
+
+    synchronized( &this->consumers ) {
+        std::vector< Pointer<ActiveMQConsumerKernel> > consumers = this->consumers.values();
+
+        std::vector< Pointer<ActiveMQConsumerKernel> >::iterator iter = consumers.begin();
+        for( ; iter != consumers.end(); ++iter ) {
+            (*iter)->inProgressClearRequired();
+
+            this->connection->getScheduler()->executeAfterDelay(
+                new ClearConsumerTask(*iter), 0LL);
+        }
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQSessionKernel::acknowledge() {
+
+    synchronized( &this->consumers ) {
+        std::vector< Pointer<ActiveMQConsumerKernel> > consumers = this->consumers.values();
+
+        std::vector< Pointer<ActiveMQConsumerKernel> >::iterator iter = consumers.begin();
+        for( ; iter != consumers.end(); ++iter ) {
+            (*iter)->acknowledge();
+        }
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQSessionKernel::deliverAcks() {
+
+    synchronized( &this->consumers ) {
+        std::vector< Pointer<ActiveMQConsumerKernel> > consumers = this->consumers.values();
+
+        std::vector< Pointer<ActiveMQConsumerKernel> >::iterator iter = consumers.begin();
+        for( ; iter != consumers.end(); ++iter ) {
+            (*iter)->deliverAcks();
+        }
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+cms::MessageConsumer* ActiveMQSessionKernel::createConsumer( const cms::Destination* destination ) {
+
+    try{
+        this->checkClosed();
+        return this->createConsumer(destination, "", false);
+    }
+    AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
+}
+
+////////////////////////////////////////////////////////////////////////////////
+cms::MessageConsumer* ActiveMQSessionKernel::createConsumer( const cms::Destination* destination,
+                                                       const std::string& selector ) {
+
+    try{
+        this->checkClosed();
+        return this->createConsumer(destination, selector, false);
+    }
+    AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
+}
+
+////////////////////////////////////////////////////////////////////////////////
+cms::MessageConsumer* ActiveMQSessionKernel::createConsumer( const cms::Destination* destination,
+                                                       const std::string& selector,
+                                                       bool noLocal ) {
+
+    try{
+
+        this->checkClosed();
+
+        // Cast the destination to an OpenWire destination, so we can
+        // get all the goodies.
+        const ActiveMQDestination* amqDestination =
+            dynamic_cast<const ActiveMQDestination*>( destination );
+
+        if( amqDestination == NULL ) {
+            throw ActiveMQException(
+                __FILE__, __LINE__,
+                "Destination was either NULL or not created by this CMS Client" );
+        }
+
+        Pointer<ActiveMQDestination> dest( amqDestination->cloneDataStructure() );
+
+        int prefetch = 0;
+        if( dest->isTopic() ) {
+            prefetch = this->connection->getPrefetchPolicy()->getTopicPrefetch();
+        } else {
+            prefetch = this->connection->getPrefetchPolicy()->getQueuePrefetch();
+        }
+
+        // Create the consumer instance.
+        Pointer<ActiveMQConsumerKernel> consumer(
+            new ActiveMQConsumerKernel(this, this->getNextConsumerId(),
+                                       dest, "", selector, prefetch, 0, noLocal,
+                                       false, this->connection->isDispatchAsync(), NULL));
+
+        try{
+            this->addConsumer(consumer);
+            this->connection->syncRequest(consumer->getConsumerInfo());
+        } catch (Exception& ex) {
+            this->removeConsumer(consumer->getConsumerId());
+            throw ex;
+        }
+
+        if (this->connection->isStarted()) {
+            consumer->start();
+        }
+
+        return new ActiveMQConsumer(consumer);
+    }
+    AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
+}
+
+////////////////////////////////////////////////////////////////////////////////
+cms::MessageConsumer* ActiveMQSessionKernel::createDurableConsumer( const cms::Topic* destination,
+                                                              const std::string& name,
+                                                              const std::string& selector,
+                                                              bool noLocal ) {
+
+    try{
+
+        this->checkClosed();
+
+        // Cast the destination to an OpenWire destination, so we can
+        // get all the goodies.
+        const ActiveMQDestination* amqDestination =
+            dynamic_cast<const ActiveMQDestination*>( destination );
+
+        if( amqDestination == NULL ) {
+            throw ActiveMQException(
+                __FILE__, __LINE__,
+                "Destination was either NULL or not created by this CMS Client" );
+        }
+
+        Pointer<ActiveMQDestination> dest( amqDestination->cloneDataStructure() );
+
+        // Create the consumer instance.
+        Pointer<ActiveMQConsumerKernel> consumer(
+            new ActiveMQConsumerKernel(this, this->getNextConsumerId(),
+                                       dest, name, selector,
+                                       this->connection->getPrefetchPolicy()->getDurableTopicPrefetch(),
+                                       0, noLocal, false, this->connection->isDispatchAsync(), NULL));
+
+        try {
+            this->addConsumer(consumer);
+            this->connection->syncRequest(consumer->getConsumerInfo());
+        } catch (Exception& ex) {
+            this->removeConsumer(consumer->getConsumerId());
+            throw ex;
+        }
+
+        if (this->connection->isStarted()) {
+            consumer->start();
+        }
+
+        return new ActiveMQConsumer(consumer);
+    }
+    AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
+}
+
+////////////////////////////////////////////////////////////////////////////////
+cms::MessageProducer* ActiveMQSessionKernel::createProducer( const cms::Destination* destination ) {
+
+    try{
+
+        this->checkClosed();
+
+        Pointer<commands::ActiveMQDestination> dest;
+
+        // Producers are allowed to have NULL destinations.  In this case, the
+        // destination is specified by the messages as they are sent.
+        if (destination != NULL) {
+
+            const ActiveMQDestination* amqDestination =
+                dynamic_cast<const ActiveMQDestination*> (destination);
+
+            if (amqDestination == NULL) {
+                throw ActiveMQException(
+                    __FILE__, __LINE__,
+                    "Destination was either NULL or not created by this CMS Client" );
+            }
+
+            // Cast the destination to an OpenWire destination, so we can
+            // get all the goodies.
+            dest.reset(amqDestination->cloneDataStructure());
+        }
+
+        // Create the producer instance.
+        Pointer<ActiveMQProducerKernel> producer( new ActiveMQProducerKernel(
+            this, this->getNextProducerId(), dest, this->connection->getSendTimeout() ) );
+
+        try{
+            this->addProducer(producer);
+            this->connection->oneway(producer->getProducerInfo());
+        } catch (Exception& ex) {
+            this->removeProducer(producer->getProducerId());
+            throw ex;
+        }
+
+        return new ActiveMQProducer(producer);
+    }
+    AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
+}
+
+////////////////////////////////////////////////////////////////////////////////
+cms::QueueBrowser* ActiveMQSessionKernel::createBrowser( const cms::Queue* queue ) {
+
+    try{
+        return ActiveMQSessionKernel::createBrowser(queue, "");
+    }
+    AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
+}
+
+////////////////////////////////////////////////////////////////////////////////
+cms::QueueBrowser* ActiveMQSessionKernel::createBrowser(const cms::Queue* queue,
+                                                  const std::string& selector) {
+
+    try{
+
+        this->checkClosed();
+
+        // Cast the destination to an OpenWire destination, so we can
+        // get all the goodies.
+        const ActiveMQDestination* amqDestination =
+            dynamic_cast<const ActiveMQDestination*> (queue);
+
+        if (amqDestination == NULL) {
+            throw ActiveMQException(
+                __FILE__, __LINE__,
+                "Destination was either NULL or not created by this CMS Client" );
+        }
+
+        Pointer<ActiveMQDestination> dest(amqDestination->cloneDataStructure());
+
+        // Create the QueueBrowser instance
+        std::auto_ptr<ActiveMQQueueBrowser> browser(
+            new ActiveMQQueueBrowser(this, this->getNextConsumerId(), dest,
+                                     selector, this->connection->isDispatchAsync()));
+
+        return browser.release();
+    }
+    AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
+}
+
+////////////////////////////////////////////////////////////////////////////////
+cms::Queue* ActiveMQSessionKernel::createQueue( const std::string& queueName ) {
+
+    try{
+
+        this->checkClosed();
+
+        if (queueName == "") {
+            throw IllegalArgumentException(
+                __FILE__, __LINE__, "Destination Name cannot be the Empty String." );
+        }
+
+        return new commands::ActiveMQQueue(queueName);
+    }
+    AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
+}
+
+////////////////////////////////////////////////////////////////////////////////
+cms::Topic* ActiveMQSessionKernel::createTopic( const std::string& topicName ) {
+
+    try{
+
+        this->checkClosed();
+
+        if (topicName == "") {
+            throw IllegalArgumentException(
+                __FILE__, __LINE__, "Destination Name cannot be the Empty String." );
+        }
+
+        return new commands::ActiveMQTopic(topicName);
+    }
+    AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
+}
+
+////////////////////////////////////////////////////////////////////////////////
+cms::TemporaryQueue* ActiveMQSessionKernel::createTemporaryQueue() {
+
+    try{
+
+        this->checkClosed();
+
+        std::auto_ptr<commands::ActiveMQTempQueue> queue(new
+            commands::ActiveMQTempQueue(this->createTemporaryDestinationName()));
+
+        // Register it with the Broker
+        this->createTemporaryDestination(queue.get());
+
+        return queue.release();
+    }
+    AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
+}
+
+////////////////////////////////////////////////////////////////////////////////
+cms::TemporaryTopic* ActiveMQSessionKernel::createTemporaryTopic() {
+
+    try{
+
+        this->checkClosed();
+
+        std::auto_ptr<commands::ActiveMQTempTopic> topic(new
+            commands::ActiveMQTempTopic(createTemporaryDestinationName()));
+
+        // Register it with the Broker
+        this->createTemporaryDestination(topic.get());
+
+        return topic.release();
+    }
+    AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
+}
+
+////////////////////////////////////////////////////////////////////////////////
+cms::Message* ActiveMQSessionKernel::createMessage() {
+
+    try{
+
+        this->checkClosed();
+        commands::ActiveMQMessage* message = new commands::ActiveMQMessage();
+        message->setConnection(this->connection);
+        return message;
+    }
+    AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
+}
+
+////////////////////////////////////////////////////////////////////////////////
+cms::BytesMessage* ActiveMQSessionKernel::createBytesMessage() {
+
+    try{
+
+        this->checkClosed();
+        commands::ActiveMQBytesMessage* message = new commands::ActiveMQBytesMessage();
+        message->setConnection(this->connection);
+        return message;
+    }
+    AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
+}
+
+////////////////////////////////////////////////////////////////////////////////
+cms::BytesMessage* ActiveMQSessionKernel::createBytesMessage( const unsigned char* bytes, int bytesSize ) {
+
+    try{
+
+        this->checkClosed();
+        cms::BytesMessage* msg = createBytesMessage();
+        msg->setBodyBytes(bytes, bytesSize);
+        return msg;
+    }
+    AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
+}
+
+////////////////////////////////////////////////////////////////////////////////
+cms::StreamMessage* ActiveMQSessionKernel::createStreamMessage() {
+
+    try{
+
+        this->checkClosed();
+        commands::ActiveMQStreamMessage* message = new commands::ActiveMQStreamMessage();
+        message->setConnection(this->connection);
+        return message;
+    }
+    AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
+}
+
+////////////////////////////////////////////////////////////////////////////////
+cms::TextMessage* ActiveMQSessionKernel::createTextMessage() {
+
+    try{
+
+        this->checkClosed();
+        commands::ActiveMQTextMessage* message = new commands::ActiveMQTextMessage();
+        message->setConnection(this->connection);
+        return message;
+    }
+    AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
+}
+
+////////////////////////////////////////////////////////////////////////////////
+cms::TextMessage* ActiveMQSessionKernel::createTextMessage( const std::string& text ) {
+
+    try {
+
+        this->checkClosed();
+        cms::TextMessage* msg = createTextMessage();
+        msg->setText(text.c_str());
+        return msg;
+    }
+    AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
+}
+
+////////////////////////////////////////////////////////////////////////////////
+cms::MapMessage* ActiveMQSessionKernel::createMapMessage() {
+
+    try{
+
+        this->checkClosed();
+        commands::ActiveMQMapMessage* message = new commands::ActiveMQMapMessage();
+        message->setConnection(this->connection);
+        return message;
+    }
+    AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
+}
+
+////////////////////////////////////////////////////////////////////////////////
+cms::Session::AcknowledgeMode ActiveMQSessionKernel::getAcknowledgeMode() const {
+    return this->ackMode;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool ActiveMQSessionKernel::isTransacted() const {
+    return (this->ackMode == Session::SESSION_TRANSACTED) || this->transaction->isInXATransaction();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQSessionKernel::send(cms::Message* message, ActiveMQProducerKernel* producer, util::Usage* usage) {
+
+    try {
+
+        this->checkClosed();
+
+        commands::Message* amqMessage = dynamic_cast< commands::Message* >(message);
+
+        if (amqMessage == NULL) {
+            throw ActiveMQException(__FILE__, __LINE__,
+                "ActiveMQSessionKernel::send - Message is not a valid Open Wire type.");
+        }
+
+        // Clear any old data that might be in the message object
+        amqMessage->getMessageId().reset(NULL);
+        amqMessage->getProducerId().reset(NULL);
+        amqMessage->getTransactionId().reset(NULL);
+
+        // Always assign the message ID, regardless of the disable
+        // flag.  Not adding a message ID will cause an NPE at the broker.
+        decaf::lang::Pointer<commands::MessageId> id(new commands::MessageId());
+        id->setProducerId(producer->getProducerInfo()->getProducerId());
+        id->setProducerSequenceId(this->getNextProducerSequenceId());
+
+        amqMessage->setMessageId(id);
+
+        // Ensure that a new transaction is started if this is the first message
+        // sent since the last commit.
+        doStartTransaction();
+        amqMessage->setTransactionId(this->transaction->getTransactionId());
+
+        // NOTE:
+        // Now we copy the message before sending, this allows the user to reuse the
+        // message object without interfering with the copy that's being sent.  We
+        // could make this step optional to increase performance but for now we won't.
+        // To not do this implies that the user must never reuse the message object, or
+        // know that the configuration of Transports doesn't involve the message hanging
+        // around beyond the point that send returns.
+        Pointer<commands::Message> msgCopy(amqMessage->cloneDataStructure());
+
+        msgCopy->onSend();
+        msgCopy->setProducerId( producer->getProducerInfo()->getProducerId() );
+
+        if (this->connection->getSendTimeout() <= 0 &&
+            !msgCopy->isResponseRequired() &&
+            !this->connection->isAlwaysSyncSend() &&
+            (!msgCopy->isPersistent() || this->connection->isUseAsyncSend() ||
+               msgCopy->getTransactionId() != NULL)) {
+
+            if (usage != NULL) {
+                usage->enqueueUsage(msgCopy->getSize());
+            }
+
+            // No Response Required.
+            this->connection->oneway(msgCopy);
+
+        } else {
+
+            // Send the message to the broker.
+            this->connection->syncRequest(msgCopy, this->connection->getSendTimeout());
+        }
+    }
+    AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
+}
+
+////////////////////////////////////////////////////////////////////////////////
+cms::ExceptionListener* ActiveMQSessionKernel::getExceptionListener() {
+
+    if( connection != NULL ) {
+        return connection->getExceptionListener();
+    }
+
+    return NULL;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+Pointer<Scheduler> ActiveMQSessionKernel::getScheduler() const {
+    return this->config->scheduler;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQSessionKernel::unsubscribe(const std::string& name) {
+
+    try{
+
+        this->checkClosed();
+
+        Pointer<RemoveSubscriptionInfo> rsi(new RemoveSubscriptionInfo());
+
+        rsi->setConnectionId(this->connection->getConnectionInfo().getConnectionId());
+        rsi->setSubcriptionName(name);
+        rsi->setClientId(this->connection->getConnectionInfo().getClientId());
+
+        // Send the message to the broker.
+        this->connection->syncRequest(rsi);
+    }
+    AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQSessionKernel::dispatch(const Pointer<MessageDispatch>& dispatch) {
+
+    if (this->executor.get() != NULL) {
+        this->executor->execute( dispatch );
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQSessionKernel::redispatch(MessageDispatchChannel& unconsumedMessages) {
+
+    std::vector< Pointer<MessageDispatch> > messages = unconsumedMessages.removeAll();
+    std::vector< Pointer<MessageDispatch> >::reverse_iterator iter = messages.rbegin();
+
+    for (; iter != messages.rend(); ++iter) {
+        executor->executeFirst( *iter );
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQSessionKernel::start() {
+
+    synchronized(&this->consumers) {
+        std::vector< Pointer<ActiveMQConsumerKernel> > consumers = this->consumers.values();
+
+        std::vector< Pointer<ActiveMQConsumerKernel> >::iterator iter = consumers.begin();
+        for (; iter != consumers.end(); ++iter) {
+            (*iter)->start();
+        }
+    }
+
+    if (this->executor.get() != NULL) {
+        this->executor->start();
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQSessionKernel::stop() {
+
+    if (this->executor.get() != NULL) {
+        this->executor->stop();
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool ActiveMQSessionKernel::isStarted() const {
+
+    if (this->executor.get() == NULL) {
+        return false;
+    }
+
+    return this->executor->isRunning();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQSessionKernel::createTemporaryDestination(commands::ActiveMQTempDestination* tempDestination) {
+
+    try {
+
+        Pointer<DestinationInfo> command(new DestinationInfo());
+        command->setConnectionId(this->connection->getConnectionInfo().getConnectionId());
+        command->setOperationType(ActiveMQConstants::DESTINATION_ADD_OPERATION);
+        command->setDestination(Pointer<ActiveMQTempDestination> (tempDestination->cloneDataStructure()));
+
+        // Send the message to the broker.
+        this->syncRequest(command);
+
+        // Now that its setup, link it to this Connection so it can be closed.
+        tempDestination->setConnection(this->connection);
+    }
+    AMQ_CATCH_RETHROW( activemq::exceptions::ActiveMQException )
+    AMQ_CATCH_EXCEPTION_CONVERT( Exception, activemq::exceptions::ActiveMQException )
+    AMQ_CATCHALL_THROW( activemq::exceptions::ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQSessionKernel::destroyTemporaryDestination(
+    commands::ActiveMQTempDestination* tempDestination) {
+
+    try {
+
+        Pointer<DestinationInfo> command(new DestinationInfo());
+
+        command->setConnectionId(this->connection->getConnectionInfo().getConnectionId());
+        command->setOperationType(ActiveMQConstants::DESTINATION_REMOVE_OPERATION);
+        command->setDestination(Pointer<ActiveMQTempDestination> (tempDestination->cloneDataStructure()));
+
+        // Send the message to the broker.
+        this->connection->syncRequest(command);
+    }
+    AMQ_CATCH_RETHROW( activemq::exceptions::ActiveMQException )
+    AMQ_CATCH_EXCEPTION_CONVERT( Exception, activemq::exceptions::ActiveMQException )
+    AMQ_CATCHALL_THROW( activemq::exceptions::ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+std::string ActiveMQSessionKernel::createTemporaryDestinationName() {
+
+    try {
+        return this->connection->getConnectionId().getValue() + ":" +
+               Long::toString(this->connection->getNextTempDestinationId());
+    }
+    AMQ_CATCH_RETHROW( activemq::exceptions::ActiveMQException )
+    AMQ_CATCH_EXCEPTION_CONVERT( Exception, activemq::exceptions::ActiveMQException )
+    AMQ_CATCHALL_THROW( activemq::exceptions::ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQSessionKernel::oneway(Pointer<Command> command) {
+
+    try{
+        this->checkClosed();
+        this->connection->oneway(command);
+    }
+    AMQ_CATCH_RETHROW( activemq::exceptions::ActiveMQException )
+    AMQ_CATCH_EXCEPTION_CONVERT( Exception, activemq::exceptions::ActiveMQException )
+    AMQ_CATCHALL_THROW( activemq::exceptions::ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+Pointer<Response> ActiveMQSessionKernel::syncRequest(Pointer<Command> command, unsigned int timeout) {
+
+    try{
+        this->checkClosed();
+        return this->connection->syncRequest(command, timeout);
+    }
+    AMQ_CATCH_RETHROW( activemq::exceptions::ActiveMQException )
+    AMQ_CATCH_EXCEPTION_CONVERT( Exception, activemq::exceptions::ActiveMQException )
+    AMQ_CATCHALL_THROW( activemq::exceptions::ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQSessionKernel::checkClosed() const {
+    if( this->closed.get() ) {
+        throw ActiveMQException(
+            __FILE__, __LINE__,
+            "ActiveMQSessionKernel - Session Already Closed" );
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQSessionKernel::addConsumer(Pointer<ActiveMQConsumerKernel> consumer) {
+
+    try{
+
+        this->checkClosed();
+
+        // Add the consumer to the map.
+        synchronized(&this->consumers) {
+            this->consumers.put(consumer->getConsumerInfo()->getConsumerId(), consumer);
+        }
+
+        // Register this as a message dispatcher for the consumer.
+        this->connection->addDispatcher(consumer->getConsumerInfo()->getConsumerId(), this);
+    }
+    AMQ_CATCH_RETHROW( activemq::exceptions::ActiveMQException )
+    AMQ_CATCH_EXCEPTION_CONVERT( Exception, activemq::exceptions::ActiveMQException )
+    AMQ_CATCHALL_THROW( activemq::exceptions::ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQSessionKernel::removeConsumer(const Pointer<ConsumerId>& consumerId) {
+
+    try{
+
+        this->checkClosed();
+
+        synchronized(&this->consumers) {
+            if (this->consumers.containsKey(consumerId)) {
+                // Remove this Id both from the Sessions Map of Consumers and from the Connection.
+                // If the kernels parent is destroyed then it will get cleaned up now.
+                this->connection->removeDispatcher(consumerId);
+                this->consumers.remove(consumerId);
+            }
+        }
+    }
+    AMQ_CATCH_RETHROW( ActiveMQException )
+    AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
+    AMQ_CATCHALL_THROW( ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQSessionKernel::addProducer(Pointer<ActiveMQProducerKernel> producer) {
+
+    try{
+
+        this->checkClosed();
+
+        this->config->producers.add(producer);
+
+        // Add to the Connections list
+        this->connection->addProducer(producer);
+    }
+    AMQ_CATCH_RETHROW( activemq::exceptions::ActiveMQException )
+    AMQ_CATCH_EXCEPTION_CONVERT( Exception, activemq::exceptions::ActiveMQException )
+    AMQ_CATCHALL_THROW( activemq::exceptions::ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQSessionKernel::removeProducer(const Pointer<commands::ProducerId>& producerId) {
+
+    try{
+
+        this->checkClosed();
+
+        this->connection->removeProducer(producerId);
+
+        std::auto_ptr<Iterator<Pointer< ActiveMQProducerKernel> > > producerIter(this->config->producers.iterator());
+
+        Pointer<ActiveMQProducerKernel> toRemove;
+        while (producerIter->hasNext()) {
+            Pointer<ActiveMQProducerKernel> temp = producerIter->next();
+            if (temp->getProducerId()->equals(*producerId)) {
+                toRemove = temp;
+                break;
+            }
+        }
+
+        if (toRemove != NULL) {
+            this->config->producers.remove(toRemove);
+        }
+    }
+    AMQ_CATCH_RETHROW( ActiveMQException )
+    AMQ_CATCH_EXCEPTION_CONVERT( Exception, ActiveMQException )
+    AMQ_CATCHALL_THROW( ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQSessionKernel::doStartTransaction() {
+
+    if (this->isTransacted() && !this->transaction->isInXATransaction()) {
+        this->transaction->begin();
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQSessionKernel::wakeup() {
+
+    if (this->executor.get() != NULL) {
+        this->executor->wakeup();
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+Pointer<commands::ConsumerId> ActiveMQSessionKernel::getNextConsumerId() {
+    Pointer<ConsumerId> consumerId(new commands::ConsumerId());
+
+    consumerId->setConnectionId(this->connection->getConnectionId().getValue());
+    consumerId->setSessionId(this->sessionInfo->getSessionId()->getValue());
+    consumerId->setValue(this->consumerIds.getNextSequenceId());
+
+    return consumerId;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+Pointer<commands::ProducerId> ActiveMQSessionKernel::getNextProducerId() {
+    Pointer<ProducerId> producerId(new ProducerId());
+
+    producerId->setConnectionId(this->connection->getConnectionId().getValue());
+    producerId->setSessionId(this->sessionInfo->getSessionId()->getValue());
+    producerId->setValue(this->producerIds.getNextSequenceId());
+
+    return producerId;
+}

Propchange: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.h?rev=1307147&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.h (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.h Thu Mar 29 22:27:32 2012
@@ -0,0 +1,513 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _ACTIVEMQ_CORE_KERNELS_ACTIVEMQSESSIONKERNEL_H_
+#define _ACTIVEMQ_CORE_KERNELS_ACTIVEMQSESSIONKERNEL_H_
+
+#include <cms/Session.h>
+#include <cms/ExceptionListener.h>
+
+#include <activemq/util/Config.h>
+#include <activemq/util/Usage.h>
+#include <activemq/exceptions/ActiveMQException.h>
+#include <activemq/core/ActiveMQTransactionContext.h>
+#include <activemq/core/kernels/ActiveMQConsumerKernel.h>
+#include <activemq/core/kernels/ActiveMQProducerKernel.h>
+#include <activemq/commands/ActiveMQTempDestination.h>
+#include <activemq/commands/Response.h>
+#include <activemq/commands/SessionInfo.h>
+#include <activemq/commands/ConsumerInfo.h>
+#include <activemq/commands/ConsumerId.h>
+#include <activemq/commands/ProducerId.h>
+#include <activemq/commands/TransactionId.h>
+#include <activemq/core/Dispatcher.h>
+#include <activemq/core/MessageDispatchChannel.h>
+#include <activemq/util/LongSequenceGenerator.h>
+#include <activemq/threads/Scheduler.h>
+
+#include <decaf/lang/Pointer.h>
+#include <decaf/util/StlMap.h>
+#include <decaf/util/Properties.h>
+#include <decaf/util/concurrent/atomic/AtomicBoolean.h>
+#include <decaf/util/concurrent/CopyOnWriteArrayList.h>
+
+#include <string>
+#include <memory>
+
+namespace activemq {
+namespace core {
+
+    class ActiveMQConnection;
+    class ActiveMQConsumer;
+    class ActiveMQProducer;
+    class ActiveMQSessionExecutor;
+
+namespace kernels {
+
+    using decaf::lang::Pointer;
+    using decaf::util::concurrent::atomic::AtomicBoolean;
+
+    class SessionConfig;
+
+    class AMQCPP_API ActiveMQSessionKernel : public virtual cms::Session, public Dispatcher {
+    private:
+
+        typedef decaf::util::StlMap< Pointer<commands::ConsumerId>,
+                                     Pointer<activemq::core::kernels::ActiveMQConsumerKernel>,
+                                     commands::ConsumerId::COMPARATOR> ConsumersMap;
+
+        friend class activemq::core::ActiveMQSessionExecutor;
+
+    protected:
+
+        SessionConfig* config;
+
+        /**
+         * SessionInfo for this Session
+         */
+        Pointer<commands::SessionInfo> sessionInfo;
+
+        /**
+         * Transaction Management object
+         */
+        Pointer<ActiveMQTransactionContext> transaction;
+
+        /**
+         * Connection
+         */
+        ActiveMQConnection* connection;
+
+        /**
+         * Map of consumers.
+         */
+        ConsumersMap consumers;
+
+        /**
+         * Indicates that this connection has been closed, it is no longer
+         * usable after this becomes true
+         */
+        AtomicBoolean closed;
+
+        /**
+         * Sends incoming messages to the registered consumers.
+         */
+        std::auto_ptr<ActiveMQSessionExecutor> executor;
+
+        /**
+         * This Sessions Acknowledgment mode.
+         */
+        cms::Session::AcknowledgeMode ackMode;
+
+        /**
+         * Next available Producer Id
+         */
+        util::LongSequenceGenerator producerIds;
+
+        /**
+         * Next available Producer Sequence Id
+         */
+        util::LongSequenceGenerator producerSequenceIds;
+
+        /**
+         * Next available Consumer Id
+         */
+        util::LongSequenceGenerator consumerIds;
+
+        /**
+         * Last Delivered Sequence Id
+         */
+        long long lastDeliveredSequenceId;
+
+    private:
+
+        ActiveMQSessionKernel(const ActiveMQSessionKernel&);
+        ActiveMQSessionKernel& operator=(const ActiveMQSessionKernel&);
+
+    public:
+
+        ActiveMQSessionKernel(ActiveMQConnection* connection,
+                              const Pointer<commands::SessionId>& id,
+                              cms::Session::AcknowledgeMode ackMode,
+                              const decaf::util::Properties& properties);
+
+        virtual ~ActiveMQSessionKernel();
+
+        /**
+         * Redispatches the given set of unconsumed messages to the consumers.
+         * @param unconsumedMessages - unconsumed messages to be redelivered.
+         */
+        virtual void redispatch(MessageDispatchChannel& unconsumedMessages);
+
+        /**
+         * Stops asynchronous message delivery.
+         */
+        virtual void start();
+
+        /**
+         * Starts asynchronous message delivery.
+         */
+        virtual void stop();
+
+        /**
+         * Indicates whether or not the session is currently in the started
+         * state.
+         */
+        bool isStarted() const;
+
+        virtual bool isAutoAcknowledge() const {
+            return this->ackMode == cms::Session::AUTO_ACKNOWLEDGE;
+        }
+
+        virtual bool isDupsOkAcknowledge() const {
+            return this->ackMode == cms::Session::DUPS_OK_ACKNOWLEDGE;
+        }
+
+        virtual bool isClientAcknowledge() const {
+            return this->ackMode == cms::Session::CLIENT_ACKNOWLEDGE;
+        }
+
+        virtual bool isIndividualAcknowledge() const {
+            return this->ackMode == cms::Session::INDIVIDUAL_ACKNOWLEDGE;
+        }
+
+        /**
+         * Fires the given exception to the exception listener of the connection
+         */
+        void fire(const exceptions::ActiveMQException& ex);
+
+    public:  // Methods from ActiveMQMessageDispatcher
+
+        /**
+         * Dispatches a message to a particular consumer.
+         * @param message - the message to be dispatched
+         */
+        virtual void dispatch(const Pointer<MessageDispatch>& message);
+
+    public:   // Implements Methods
+
+        virtual void close();
+
+        virtual void commit();
+
+        virtual void rollback();
+
+        virtual void recover();
+
+        virtual cms::MessageConsumer* createConsumer(const cms::Destination* destination);
+
+        virtual cms::MessageConsumer* createConsumer(const cms::Destination* destination,
+                                                     const std::string& selector);
+
+        virtual cms::MessageConsumer* createConsumer(const cms::Destination* destination,
+                                                     const std::string& selector,
+                                                     bool noLocal);
+
+        virtual cms::MessageConsumer* createDurableConsumer(const cms::Topic* destination,
+                                                            const std::string& name,
+                                                            const std::string& selector,
+                                                            bool noLocal = false);
+
+        virtual cms::MessageProducer* createProducer(const cms::Destination* destination);
+
+        virtual cms::QueueBrowser* createBrowser(const cms::Queue* queue);
+
+        virtual cms::QueueBrowser* createBrowser(const cms::Queue* queue, const std::string& selector);
+
+        virtual cms::Queue* createQueue(const std::string& queueName);
+
+        virtual cms::Topic* createTopic(const std::string& topicName);
+
+        virtual cms::TemporaryQueue* createTemporaryQueue();
+
+        virtual cms::TemporaryTopic* createTemporaryTopic();
+
+        virtual cms::Message* createMessage();
+
+        virtual cms::BytesMessage* createBytesMessage();
+
+        virtual cms::BytesMessage* createBytesMessage(const unsigned char* bytes, int bytesSize);
+
+        virtual cms::StreamMessage* createStreamMessage();
+
+        virtual cms::TextMessage* createTextMessage();
+
+        virtual cms::TextMessage* createTextMessage( const std::string& text );
+
+        virtual cms::MapMessage* createMapMessage();
+
+        virtual cms::Session::AcknowledgeMode getAcknowledgeMode() const;
+
+        virtual bool isTransacted() const;
+
+        virtual void unsubscribe(const std::string& name);
+
+   public:   // ActiveMQSession specific Methods
+
+        /**
+         * Sends a message from the Producer specified using this session's connection
+         * the message will be sent using the best available means depending on the
+         * configuration of the connection.
+         * <p>
+         * Asynchronous sends will be chosen if at all possible.
+         *
+         * @param message
+         *        The message to send to the broker.
+         * @param producer
+         *        The sending Producer
+         * @param usage
+         *        Pointer to a Usage tracker which if set will be increased by the size
+         *        of the given message.
+         *
+         * @throws CMSException
+         */
+        void send(cms::Message* message, kernels::ActiveMQProducerKernel* producer, util::Usage* usage);
+
+        /**
+         * This method gets any registered exception listener of this sessions
+         * connection and returns it.  Mainly intended for use by the objects
+         * that this session creates so that they can notify the client of
+         * exceptions that occur in the context of another thread.
+         * @returns cms::ExceptionListener pointer or NULL
+         */
+        cms::ExceptionListener* getExceptionListener();
+
+        /**
+         * Gets the Session Information object for this session, if the
+         * session is closed than this method throws an exception.
+         * @return SessionInfo Reference
+         */
+        const commands::SessionInfo& getSessionInfo() const {
+            this->checkClosed();
+            return *( this->sessionInfo );
+        }
+
+        /**
+         * Gets the Session Id object for this session, if the session
+         * is closed than this method throws an exception.
+         * @return SessionId Reference
+         */
+        const commands::SessionId& getSessionId() const {
+            this->checkClosed();
+            return *( this->sessionInfo->getSessionId() );
+        }
+
+        /**
+         * Gets the ActiveMQConnection that is associated with this session.
+         */
+        ActiveMQConnection* getConnection() const {
+            return this->connection;
+        }
+
+        /**
+         * Gets a Pointer to this Session's Scheduler instance
+         */
+        Pointer<threads::Scheduler> getScheduler() const;
+
+        /**
+         * Gets the currently set Last Delivered Sequence Id
+         *
+         * @returns long long containing the sequence id of the last delivered Message.
+         */
+        long long getLastDeliveredSequenceId() const {
+            return this->lastDeliveredSequenceId;
+        }
+
+        /**
+         * Sets the value of the Last Delivered Sequence Id
+         *
+         * @param value
+         *      The new value to assign to the Last Delivered Sequence Id property.
+         */
+        void setLastDeliveredSequenceId(long long value) {
+            this->lastDeliveredSequenceId = value;
+        }
+
+        /**
+         * Sends a Command to the broker without requesting any Response be returned.
+         * .
+         * @param command
+         *      The message to send to the Broker.
+         *
+         * @throws ActiveMQException if not currently connected, or if the
+         *         operation fails for any reason.
+         */
+        void oneway(Pointer<commands::Command> command);
+
+        /**
+         * Sends a synchronous request and returns the response from the broker.
+         * Converts any error responses into an exception.
+         *
+         * @param command
+         *      The command to send to the broker.
+         * @param timeout
+         *      The time to wait for a response, default is zero or infinite.
+         *
+         * @returns Pointer to a Response object that the broker has returned for the Command sent.
+         *
+         * @throws ActiveMQException thrown if an error response was received
+         *         from the broker, or if any other error occurred.
+         */
+        Pointer<commands::Response> syncRequest(Pointer<commands::Command> command, unsigned int timeout = 0);
+
+        /**
+         * Adds a MessageConsumerKernel to this session registering it with the Connection and
+         * store a reference to it so the session can ensure that all resources are closed when
+         * the session is closed.
+         *
+         * @param consumer
+         *      The ActiveMQConsumer instance to add to this session.
+         *
+         * @throw ActiveMQException if an internal error occurs.
+         */
+        void addConsumer(Pointer<activemq::core::kernels::ActiveMQConsumerKernel> consumer);
+
+        /**
+         * Dispose of a MessageConsumer from this session.  Removes it from the Connection
+         * and clean up any resources associated with it.
+         *
+         * @param consumerId
+         *      The ConsumerId of the MessageConsumer to remove from this Session.
+         *
+         * @throw ActiveMQException if an internal error occurs.
+         */
+        void removeConsumer(const Pointer<commands::ConsumerId>& consumerId);
+
+        /**
+         * Adds a MessageProducer to this session registering it with the Connection and store
+         * a reference to it so the session can ensure that all resources are closed when
+         * the session is closed.
+         *
+         * @param producer
+         *      The ActiveMQProducerKernel instance to add to this session.
+         *
+         * @throw ActiveMQException if an internal error occurs.
+         */
+        void addProducer(Pointer<activemq::core::kernels::ActiveMQProducerKernel> producer);
+
+        /**
+         * Dispose of a MessageProducer from this session.  Removes it from the Connection
+         * and clean up any resources associated with it.
+         *
+         * @param producerId
+         *      The ProducerId of the producer to remove to this session.
+         *
+         * @throw ActiveMQException if an internal error occurs.
+         */
+        void removeProducer(const Pointer<commands::ProducerId>& producerId);
+
+        /**
+         * Starts if not already start a Transaction for this Session.  If the session
+         * is not a Transacted Session then an exception is thrown.  If a transaction is
+         * already in progress then this method has no effect.
+         *
+         * @throw ActiveMQException if this is not a Transacted Session.
+         */
+        virtual void doStartTransaction();
+
+        /**
+         * Gets the Pointer to this Session's TransactionContext
+         *
+         * @return a Pointer to this Session's TransactionContext
+         */
+        Pointer<ActiveMQTransactionContext> getTransactionContext() {
+            return this->transaction;
+        }
+
+        /**
+         * Request that the Session inform all its consumers to Acknowledge all Message's
+         * that have been received so far.
+         */
+        void acknowledge();
+
+        /**
+         * Request that this Session inform all of its consumers to deliver their pending
+         * acks.
+         */
+        void deliverAcks();
+
+        /**
+         * Request that this Session inform all of its consumers to clear all messages that
+         * are currently in progress.
+         */
+        void clearMessagesInProgress();
+
+        /**
+         * Causes the Session to wakeup its executer and ensure all messages are dispatched.
+         */
+        void wakeup();
+
+        /**
+         * Get the Next available Consumer Id
+         * @return the next id in the sequence.
+         */
+        Pointer<commands::ConsumerId> getNextConsumerId();
+
+        /**
+         * Get the Next available Producer Id
+         * @return the next id in the sequence.
+         */
+        Pointer<commands::ProducerId> getNextProducerId();
+
+        /**
+         * Performs the actual Session close operations.  This method is meant for use
+         * by ActiveMQConnection, the connection object calls this when it has been
+         * closed to skip some of the extraneous processing done by the client level
+         * close method.
+         */
+        void doClose();
+
+        /**
+         * Cleans up the Session object's resources without attempting to send the
+         * Remove command to the broker, this can be called from ActiveMQConnection when
+         * it knows that the transport is down and the doClose method would throw an
+         * exception when it attempt to send the Remove Command.
+         */
+        void dispose();
+
+   private:
+
+       /**
+        * Get the Next available Producer Sequence Id
+        * @return the next id in the sequence.
+        */
+       long long getNextProducerSequenceId() {
+           return this->producerSequenceIds.getNextSequenceId();
+       }
+
+       // Checks for the closed state and throws if so.
+       void checkClosed() const;
+
+       // Send the Destination Creation Request to the Broker, alerting it
+       // that we've created a new Temporary Destination.
+       // @param tempDestination - The new Temporary Destination
+       void createTemporaryDestination(commands::ActiveMQTempDestination* tempDestination);
+
+       // Send the Destination Destruction Request to the Broker, alerting
+       // it that we've removed an existing Temporary Destination.
+       // @param tempDestination - The Temporary Destination to remove
+       void destroyTemporaryDestination(commands::ActiveMQTempDestination* tempDestination);
+
+       // Creates a new Temporary Destination name using the connection id
+       // and a rolling count.
+       // @returns a unique Temporary Destination name
+       std::string createTemporaryDestinationName();
+
+    };
+
+}}}
+
+#endif /* _ACTIVEMQ_CORE_KERNELS_ACTIVEMQSESSIONKERNEL_H_ */

Propchange: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQSessionKernel.h
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQXASessionKernel.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQXASessionKernel.cpp?rev=1307147&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQXASessionKernel.cpp (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQXASessionKernel.cpp Thu Mar 29 22:27:32 2012
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "ActiveMQXASessionKernel.h"
+
+#include <cms/TransactionInProgressException.h>
+#include <activemq/core/ActiveMQTransactionContext.h>
+
+using namespace activemq;
+using namespace activemq::core;
+using namespace activemq::core::kernels;
+using namespace decaf;
+using namespace decaf::lang;
+
+////////////////////////////////////////////////////////////////////////////////
+ActiveMQXASessionKernel::ActiveMQXASessionKernel(ActiveMQConnection* connection,
+                                                 const Pointer<commands::SessionId>& sessionId,
+                                                 const decaf::util::Properties& properties ) :
+    ActiveMQSessionKernel(connection, sessionId, cms::Session::AUTO_ACKNOWLEDGE, properties) {
+}
+
+////////////////////////////////////////////////////////////////////////////////
+ActiveMQXASessionKernel::~ActiveMQXASessionKernel() {
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool ActiveMQXASessionKernel::isTransacted() const {
+    return this->transaction->isInXATransaction();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+bool ActiveMQXASessionKernel::isAutoAcknowledge() const {
+    // Force this to always be true so the Session acts like an Auto Ack session
+    // when there is no active XA Transaction.
+    return true;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQXASessionKernel::doStartTransaction() {
+    // Controlled by the XAResource so this method is now a No-op.
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQXASessionKernel::commit() {
+    throw cms::TransactionInProgressException("Cannot commit inside an XASession");
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQXASessionKernel::rollback() {
+    throw cms::TransactionInProgressException("Cannot rollback inside an XASession");
+}
+
+////////////////////////////////////////////////////////////////////////////////
+cms::XAResource* ActiveMQXASessionKernel::getXAResource() const {
+    return this->transaction.get();
+}

Propchange: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQXASessionKernel.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQXASessionKernel.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQXASessionKernel.h?rev=1307147&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQXASessionKernel.h (added)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQXASessionKernel.h Thu Mar 29 22:27:32 2012
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _ACTIVEMQ_CORE_KERNELS_ACTIVEMQXASESSIONKERNEL_H_
+#define _ACTIVEMQ_CORE_KERNELS_ACTIVEMQXASESSIONKERNEL_H_
+
+#include <activemq/util/Config.h>
+
+#include <cms/XASession.h>
+#include <activemq/core/kernels/ActiveMQSessionKernel.h>
+
+namespace activemq {
+namespace core {
+namespace kernels {
+
+    using decaf::lang::Pointer;
+
+    class AMQCPP_API ActiveMQXASessionKernel : public cms::XASession, public ActiveMQSessionKernel {
+    public:
+
+        ActiveMQXASessionKernel(ActiveMQConnection* connection,
+                                const Pointer<commands::SessionId>& sessionId,
+                                const decaf::util::Properties& properties);
+
+        virtual ~ActiveMQXASessionKernel();
+
+    public:  // Override ActiveMQSessionKernel methods to make them XA Aware
+
+        virtual bool isTransacted() const;
+
+        virtual bool isAutoAcknowledge() const;
+
+        virtual void doStartTransaction();
+
+        virtual void commit();
+
+        virtual void rollback();
+
+    public:  // XASession overrides
+
+        virtual cms::XAResource* getXAResource() const;
+
+    };
+
+}}}
+
+#endif /* _ACTIVEMQ_CORE_KERNELS_ACTIVEMQXASESSIONKERNEL_H_ */

Propchange: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQXASessionKernel.h
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/cms/Session.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/cms/Session.cpp?rev=1307147&r1=1307146&r2=1307147&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/cms/Session.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/cms/Session.cpp Thu Mar 29 22:27:32 2012
@@ -20,7 +20,7 @@
 using namespace cms;
 
 ////////////////////////////////////////////////////////////////////////////////
-Session::~Session() throw() {
+Session::~Session() {
 
 }
 

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/cms/Session.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/cms/Session.h?rev=1307147&r1=1307146&r2=1307147&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/cms/Session.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/cms/Session.h Thu Mar 29 22:27:32 2012
@@ -147,7 +147,7 @@ namespace cms{
 
     public:
 
-        virtual ~Session() throw();
+        virtual ~Session();
 
         /**
          * Closes this session as well as any active child consumers or

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/cms/XAConnection.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/cms/XAConnection.cpp?rev=1307147&r1=1307146&r2=1307147&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/cms/XAConnection.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/cms/XAConnection.cpp Thu Mar 29 22:27:32 2012
@@ -20,6 +20,6 @@
 using namespace cms;
 
 ////////////////////////////////////////////////////////////////////////////////
-XAConnection::~XAConnection() throw() {
+XAConnection::~XAConnection() {
 
 }

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/cms/XAConnection.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/cms/XAConnection.h?rev=1307147&r1=1307146&r2=1307147&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/cms/XAConnection.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/cms/XAConnection.h Thu Mar 29 22:27:32 2012
@@ -37,7 +37,7 @@ namespace cms {
     class CMS_API XAConnection : public virtual cms::Connection {
     public:
 
-        virtual ~XAConnection() throw();
+        virtual ~XAConnection();
 
         /**
          * Creates an XASession object.

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/cms/XASession.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/cms/XASession.cpp?rev=1307147&r1=1307146&r2=1307147&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/cms/XASession.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/cms/XASession.cpp Thu Mar 29 22:27:32 2012
@@ -20,5 +20,5 @@
 using namespace cms;
 
 ////////////////////////////////////////////////////////////////////////////////
-XASession::~XASession() throw() {
+XASession::~XASession() {
 }

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/cms/XASession.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/cms/XASession.h?rev=1307147&r1=1307146&r2=1307147&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/cms/XASession.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/cms/XASession.h Thu Mar 29 22:27:32 2012
@@ -54,7 +54,7 @@ namespace cms {
     class CMS_API XASession : public virtual cms::Session {
     public:
 
-        virtual ~XASession() throw();
+        virtual ~XASession();
 
         /**
          * Returns the XA resource associated with this Session to the caller.



Mime
View raw message