activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r1307147 [2/3] - in /activemq/activemq-cpp/trunk/activemq-cpp/src/main: ./ activemq/core/ activemq/core/kernels/ cms/
Date Thu, 29 Mar 2012 22:27:33 GMT
Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.h?rev=1307147&r1=1307146&r2=1307147&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSession.h Thu Mar 29 22:27:32 2012
@@ -21,28 +21,13 @@
 #include <cms/ExceptionListener.h>
 
 #include <activemq/util/Config.h>
-#include <activemq/util/Usage.h>
 #include <activemq/exceptions/ActiveMQException.h>
-#include <activemq/core/ActiveMQTransactionContext.h>
 #include <activemq/core/kernels/ActiveMQConsumerKernel.h>
 #include <activemq/core/kernels/ActiveMQProducerKernel.h>
-#include <activemq/commands/ActiveMQTempDestination.h>
-#include <activemq/commands/Response.h>
+#include <activemq/core/kernels/ActiveMQSessionKernel.h>
 #include <activemq/commands/SessionInfo.h>
-#include <activemq/commands/ConsumerInfo.h>
-#include <activemq/commands/ConsumerId.h>
-#include <activemq/commands/ProducerId.h>
-#include <activemq/commands/TransactionId.h>
-#include <activemq/core/Dispatcher.h>
-#include <activemq/core/MessageDispatchChannel.h>
-#include <activemq/util/LongSequenceGenerator.h>
-#include <activemq/threads/Scheduler.h>
 
 #include <decaf/lang/Pointer.h>
-#include <decaf/util/StlMap.h>
-#include <decaf/util/Properties.h>
-#include <decaf/util/concurrent/atomic/AtomicBoolean.h>
-#include <decaf/util/concurrent/CopyOnWriteArrayList.h>
 
 #include <string>
 #include <memory>
@@ -53,102 +38,23 @@ namespace core{
     using decaf::lang::Pointer;
     using decaf::util::concurrent::atomic::AtomicBoolean;
 
-    class SessionConfig;
     class ActiveMQConnection;
-    class ActiveMQConsumer;
-    class ActiveMQMessage;
-    class ActiveMQProducer;
-    class ActiveMQConsumer;
-    class ActiveMQSessionExecutor;
-
-    class AMQCPP_API ActiveMQSession : public virtual cms::Session, public Dispatcher {
-    private:
-
-        typedef decaf::util::StlMap< Pointer<commands::ConsumerId>,
-                                     Pointer<activemq::core::kernels::ActiveMQConsumerKernel>,
-                                     commands::ConsumerId::COMPARATOR> ConsumersMap;
-
-        friend class ActiveMQSessionExecutor;
 
+    class AMQCPP_API ActiveMQSession : public virtual cms::Session {
     protected:
 
-        SessionConfig* config;
-
-        /**
-         * SessionInfo for this Session
-         */
-        Pointer<commands::SessionInfo> sessionInfo;
-
-        /**
-         * Transaction Management object
-         */
-        Pointer<ActiveMQTransactionContext> transaction;
-
-        /**
-         * Connection
-         */
-        ActiveMQConnection* connection;
-
-        /**
-         * Map of consumers.
-         */
-        ConsumersMap consumers;
-
-        /**
-         * Indicates that this connection has been closed, it is no longer
-         * usable after this becomes true
-         */
-        AtomicBoolean closed;
-
-        /**
-         * Sends incoming messages to the registered consumers.
-         */
-        std::auto_ptr<ActiveMQSessionExecutor> executor;
-
-        /**
-         * This Sessions Acknowledgment mode.
-         */
-        cms::Session::AcknowledgeMode ackMode;
-
-        /**
-         * Next available Producer Id
-         */
-        util::LongSequenceGenerator producerIds;
-
-        /**
-         * Next available Producer Sequence Id
-         */
-        util::LongSequenceGenerator producerSequenceIds;
-
-        /**
-         * Next available Consumer Id
-         */
-        util::LongSequenceGenerator consumerIds;
-
-        /**
-         * Last Delivered Sequence Id
-         */
-        long long lastDeliveredSequenceId;
+        Pointer<activemq::core::kernels::ActiveMQSessionKernel> kernel;
 
     private:
 
-        ActiveMQSession( const ActiveMQSession& );
-        ActiveMQSession& operator= ( const ActiveMQSession& );
+        ActiveMQSession(const ActiveMQSession&);
+        ActiveMQSession& operator=(const ActiveMQSession&);
 
     public:
 
-        ActiveMQSession(ActiveMQConnection* connection,
-                        const Pointer<commands::SessionId>& id,
-                        cms::Session::AcknowledgeMode ackMode,
-                        const decaf::util::Properties& properties);
+        ActiveMQSession(Pointer<activemq::core::kernels::ActiveMQSessionKernel> kernel);
 
-        virtual ~ActiveMQSession() throw();
-
-        /**
-         * Redispatches the given set of unconsumed messages to the consumers.
-         * @param unconsumedMessages - unconsumed messages to be redelivered.
-         */
-        virtual void redispatch(MessageDispatchChannel& unconsumedMessages);
+        virtual ~ActiveMQSession();
 
         /**
          * Stops asynchronous message delivery.
@@ -164,37 +70,10 @@ namespace core{
          * Indicates whether or not the session is currently in the started
          * state.
          */
-        bool isStarted() const;
-
-        virtual bool isAutoAcknowledge() const {
-            return this->ackMode == cms::Session::AUTO_ACKNOWLEDGE;
-        }
-
-        virtual bool isDupsOkAcknowledge() const {
-            return this->ackMode == cms::Session::DUPS_OK_ACKNOWLEDGE;
-        }
-
-        virtual bool isClientAcknowledge() const {
-            return this->ackMode == cms::Session::CLIENT_ACKNOWLEDGE;
-        }
-
-        virtual bool isIndividualAcknowledge() const {
-            return this->ackMode == cms::Session::INDIVIDUAL_ACKNOWLEDGE;
+        bool isStarted() const {
+            return this->kernel->isStarted();
         }
 
-        /**
-         * Fires the given exception to the exception listener of the connection
-         */
-        void fire(const exceptions::ActiveMQException& ex);
-
-    public:  // Methods from ActiveMQMessageDispatcher
-
-        /**
-         * Dispatches a message to a particular consumer.
-         * @param message - the message to be dispatched
-         */
-        virtual void dispatch(const Pointer<MessageDispatch>& message);
-
     public:   // Implements Methods
 
         virtual void close();
@@ -243,36 +122,21 @@ namespace core{
 
         virtual cms::TextMessage* createTextMessage();
 
-        virtual cms::TextMessage* createTextMessage( const std::string& text );
+        virtual cms::TextMessage* createTextMessage(const std::string& text);
 
         virtual cms::MapMessage* createMapMessage();
 
-        virtual cms::Session::AcknowledgeMode getAcknowledgeMode() const;
+        virtual cms::Session::AcknowledgeMode getAcknowledgeMode() const {
+            return this->kernel->getAcknowledgeMode();
+        }
 
-        virtual bool isTransacted() const;
+        virtual bool isTransacted() const {
+            return this->kernel->isTransacted();
+        }
 
         virtual void unsubscribe(const std::string& name);
 
-   public:   // ActiveMQSession specific Methods
-
-        /**
-         * Sends a message from the Producer specified using this session's connection
-         * the message will be sent using the best available means depending on the
-         * configuration of the connection.
-         * <p>
-         * Asynchronous sends will be chosen if at all possible.
-         *
-         * @param message
-         *        The message to send to the broker.
-         * @param producer
-         *        The sending Producer
-         * @param usage
-         *        Pointer to a Usage tracker which if set will be increased by the size
-         *        of the given message.
-         *
-         * @throws CMSException
-         */
-        void send(cms::Message* message, kernels::ActiveMQProducerKernel* producer, util::Usage* usage);
+   public:  // ActiveMQSession specific Methods
 
         /**
          * This method gets any registered exception listener of this sessions
@@ -281,7 +145,9 @@ namespace core{
          * exceptions that occur in the context of another thread.
          * @returns cms::ExceptionListener pointer or NULL
          */
-        cms::ExceptionListener* getExceptionListener();
+        cms::ExceptionListener* getExceptionListener() {
+            return this->kernel->getExceptionListener();
+        }
 
         /**
          * Gets the Session Information object for this session, if the
@@ -289,8 +155,7 @@ namespace core{
          * @return SessionInfo Reference
          */
         const commands::SessionInfo& getSessionInfo() const {
-            this->checkClosed();
-            return *( this->sessionInfo );
+            return this->kernel->getSessionInfo();
         }
 
         /**
@@ -299,211 +164,16 @@ namespace core{
          * @return SessionId Reference
          */
         const commands::SessionId& getSessionId() const {
-            this->checkClosed();
-            return *( this->sessionInfo->getSessionId() );
+            return this->kernel->getSessionId();
         }
 
         /**
          * Gets the ActiveMQConnection that is associated with this session.
          */
         ActiveMQConnection* getConnection() const {
-            return this->connection;
-        }
-
-        /**
-         * Gets a Pointer to this Session's Scheduler instance
-         */
-        Pointer<threads::Scheduler> getScheduler() const;
-
-        /**
-         * Gets the currently set Last Delivered Sequence Id
-         *
-         * @returns long long containing the sequence id of the last delivered Message.
-         */
-        long long getLastDeliveredSequenceId() const {
-            return this->lastDeliveredSequenceId;
-        }
-
-        /**
-         * Sets the value of the Last Delivered Sequence Id
-         *
-         * @param value
-         *      The new value to assign to the Last Delivered Sequence Id property.
-         */
-        void setLastDeliveredSequenceId(long long value) {
-            this->lastDeliveredSequenceId = value;
+            return this->kernel->getConnection();
         }
 
-        /**
-         * Sends a Command to the broker without requesting any Response be returned.
-         * .
-         * @param command
-         *      The message to send to the Broker.
-         *
-         * @throws ActiveMQException if not currently connected, or if the
-         *         operation fails for any reason.
-         */
-        void oneway(Pointer<commands::Command> command);
-
-        /**
-         * Sends a synchronous request and returns the response from the broker.
-         * Converts any error responses into an exception.
-         *
-         * @param command
-         *      The command to send to the broker.
-         * @param timeout
-         *      The time to wait for a response, default is zero or infinite.
-         *
-         * @returns Pointer to a Response object that the broker has returned for the Command sent.
-         *
-         * @throws ActiveMQException thrown if an error response was received
-         *         from the broker, or if any other error occurred.
-         */
-        Pointer<commands::Response> syncRequest(Pointer<commands::Command> command, unsigned int timeout = 0);
-
-        /**
-         * Adds a MessageConsumerKernel to this session registering it with the Connection and
-         * store a reference to it so the session can ensure that all resources are closed when
-         * the session is closed.
-         *
-         * @param consumer
-         *      The ActiveMQConsumer instance to add to this session.
-         *
-         * @throw ActiveMQException if an internal error occurs.
-         */
-        void addConsumer(Pointer<activemq::core::kernels::ActiveMQConsumerKernel> consumer);
-
-        /**
-         * Dispose of a MessageConsumer from this session.  Removes it from the Connection
-         * and clean up any resources associated with it.
-         *
-         * @param consumerId
-         *      The ConsumerId of the MessageConsumer to remove from this Session.
-         *
-         * @throw ActiveMQException if an internal error occurs.
-         */
-        void removeConsumer(const Pointer<commands::ConsumerId>& consumerId);
-
-        /**
-         * Adds a MessageProducer to this session registering it with the Connection and store
-         * a reference to it so the session can ensure that all resources are closed when
-         * the session is closed.
-         *
-         * @param producer
-         *      The ActiveMQProducerKernel instance to add to this session.
-         *
-         * @throw ActiveMQException if an internal error occurs.
-         */
-        void addProducer(Pointer<activemq::core::kernels::ActiveMQProducerKernel> producer);
-
-        /**
-         * Dispose of a MessageProducer from this session.  Removes it from the Connection
-         * and clean up any resources associated with it.
-         *
-         * @param producerId
-         *      The ProducerId of the producer to remove to this session.
-         *
-         * @throw ActiveMQException if an internal error occurs.
-         */
-        void removeProducer(const Pointer<commands::ProducerId>& producerId);
-
-        /**
-         * Starts if not already start a Transaction for this Session.  If the session
-         * is not a Transacted Session then an exception is thrown.  If a transaction is
-         * already in progress then this method has no effect.
-         *
-         * @throw ActiveMQException if this is not a Transacted Session.
-         */
-        virtual void doStartTransaction();
-
-        /**
-         * Gets the Pointer to this Session's TransactionContext
-         *
-         * @return a Pointer to this Session's TransactionContext
-         */
-        Pointer<ActiveMQTransactionContext> getTransactionContext() {
-            return this->transaction;
-        }
-
-        /**
-         * Request that the Session inform all its consumers to Acknowledge all Message's
-         * that have been received so far.
-         */
-        void acknowledge();
-
-        /**
-         * Request that this Session inform all of its consumers to deliver their pending
-         * acks.
-         */
-        void deliverAcks();
-
-        /**
-         * Request that this Session inform all of its consumers to clear all messages that
-         * are currently in progress.
-         */
-        void clearMessagesInProgress();
-
-        /**
-         * Causes the Session to wakeup its executer and ensure all messages are dispatched.
-         */
-        void wakeup();
-
-        /**
-         * Get the Next available Consumer Id
-         * @return the next id in the sequence.
-         */
-        Pointer<commands::ConsumerId> getNextConsumerId();
-
-        /**
-         * Get the Next available Producer Id
-         * @return the next id in the sequence.
-         */
-        Pointer<commands::ProducerId> getNextProducerId();
-
-        /**
-         * Performs the actual Session close operations.  This method is meant for use
-         * by ActiveMQConnection, the connection object calls this when it has been
-         * closed to skip some of the extraneous processing done by the client level
-         * close method.
-         */
-        void doClose();
-
-        /**
-         * Cleans up the Session object's resources without attempting to send the
-         * Remove command to the broker, this can be called from ActiveMQConnection when
-         * it knows that the transport is down and the doClose method would throw an
-         * exception when it attempt to send the Remove Command.
-         */
-        void dispose();
-
-   private:
-
-       /**
-        * Get the Next available Producer Sequence Id
-        * @return the next id in the sequence.
-        */
-       long long getNextProducerSequenceId() {
-           return this->producerSequenceIds.getNextSequenceId();
-       }
-
-       // Checks for the closed state and throws if so.
-       void checkClosed() const;
-
-       // Send the Destination Creation Request to the Broker, alerting it
-       // that we've created a new Temporary Destination.
-       // @param tempDestination - The new Temporary Destination
-       void createTemporaryDestination(commands::ActiveMQTempDestination* tempDestination);
-
-       // Send the Destination Destruction Request to the Broker, alerting
-       // it that we've removed an existing Temporary Destination.
-       // @param tempDestination - The Temporary Destination to remove
-       void destroyTemporaryDestination(commands::ActiveMQTempDestination* tempDestination);
-
-       // Creates a new Temporary Destination name using the connection id
-       // and a rolling count.
-       // @returns a unique Temporary Destination name
-       std::string createTemporaryDestinationName();
-
     };
 
 }}

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSessionExecutor.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSessionExecutor.cpp?rev=1307147&r1=1307146&r2=1307147&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSessionExecutor.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSessionExecutor.cpp Thu Mar 29 22:27:32 2012
@@ -19,6 +19,7 @@
 
 #include <activemq/core/ActiveMQConnection.h>
 #include <activemq/core/kernels/ActiveMQConsumerKernel.h>
+#include <activemq/core/kernels/ActiveMQSessionKernel.h>
 #include <activemq/core/ActiveMQSession.h>
 #include <activemq/core/FifoMessageDispatchChannel.h>
 #include <activemq/core/SimplePriorityMessageDispatchChannel.h>
@@ -36,7 +37,7 @@ using namespace decaf::util;
 using namespace decaf::util::concurrent;
 
 ////////////////////////////////////////////////////////////////////////////////
-ActiveMQSessionExecutor::ActiveMQSessionExecutor(ActiveMQSession* session) :
+ActiveMQSessionExecutor::ActiveMQSessionExecutor(ActiveMQSessionKernel* session) :
     session(session), messageQueue(), taskRunner() {
 
     if (this->session->getConnection()->isMessagePrioritySupported()) {
@@ -64,7 +65,7 @@ ActiveMQSessionExecutor::~ActiveMQSessio
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ActiveMQSessionExecutor::execute( const Pointer<MessageDispatch>& dispatch ) {
+void ActiveMQSessionExecutor::execute(const Pointer<MessageDispatch>& dispatch) {
 
     // Add the data to the queue.
     this->messageQueue->enqueue(dispatch);
@@ -72,7 +73,7 @@ void ActiveMQSessionExecutor::execute( c
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ActiveMQSessionExecutor::executeFirst( const Pointer<MessageDispatch>& dispatch ) {
+void ActiveMQSessionExecutor::executeFirst(const Pointer<MessageDispatch>& dispatch) {
 
     // Add the data to the queue.
     this->messageQueue->enqueueFirst(dispatch);
@@ -119,7 +120,7 @@ void ActiveMQSessionExecutor::stop() {
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ActiveMQSessionExecutor::dispatch( const Pointer<MessageDispatch>& dispatch ) {
+void ActiveMQSessionExecutor::dispatch(const Pointer<MessageDispatch>& dispatch) {
 
     try {
 
@@ -139,13 +140,10 @@ void ActiveMQSessionExecutor::dispatch( 
 
     } catch (decaf::lang::Exception& ex) {
         ex.setMark(__FILE__, __LINE__);
-        ex.printStackTrace();
     } catch (std::exception& ex) {
         ActiveMQException amqex(__FILE__, __LINE__, ex.what());
-        amqex.printStackTrace();
     } catch (...) {
         ActiveMQException amqex(__FILE__, __LINE__, "caught unknown exception");
-        amqex.printStackTrace();
     }
 }
 

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSessionExecutor.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSessionExecutor.h?rev=1307147&r1=1307146&r2=1307147&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSessionExecutor.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQSessionExecutor.h Thu Mar 29 22:27:32 2012
@@ -28,11 +28,13 @@
 
 namespace activemq{
 namespace core{
+namespace kernels{
+    class ActiveMQSessionKernel;
+}
 
     using decaf::lang::Pointer;
     using activemq::commands::MessageDispatch;
 
-    class ActiveMQSession;
     class ActiveMQConsumer;
 
     /**
@@ -43,7 +45,7 @@ namespace core{
     private:
 
         /** Session that is this executors parent. */
-        ActiveMQSession* session;
+        activemq::core::kernels::ActiveMQSessionKernel* session;
 
         /** The Channel that holds the waiting Messages for Dispatching. */
         Pointer<MessageDispatchChannel> messageQueue;
@@ -53,15 +55,15 @@ namespace core{
 
     private:
 
-        ActiveMQSessionExecutor( const ActiveMQSessionExecutor& );
-        ActiveMQSessionExecutor& operator= ( const ActiveMQSessionExecutor& );
+        ActiveMQSessionExecutor(const ActiveMQSessionExecutor&);
+        ActiveMQSessionExecutor& operator=(const ActiveMQSessionExecutor&);
 
     public:
 
         /**
          * Creates an un-started executor for the given session.
          */
-        ActiveMQSessionExecutor( ActiveMQSession* session );
+        ActiveMQSessionExecutor(activemq::core::kernels::ActiveMQSessionKernel* session);
 
         /**
          * Calls stop() then clear().
@@ -73,14 +75,14 @@ namespace core{
          * end of the queue.
          * @param data - the data to be dispatched.
          */
-        virtual void execute( const Pointer<MessageDispatch>& data );
+        virtual void execute(const Pointer<MessageDispatch>& data);
 
         /**
          * Executes the dispatch.  Adds the given data to the
          * beginning of the queue.
          * @param data - the data to be dispatched.
          */
-        virtual void executeFirst( const Pointer<MessageDispatch>& data );
+        virtual void executeFirst(const Pointer<MessageDispatch>& data);
 
         /**
          * Removes all messages in the Dispatch Channel so that non are delivered.
@@ -162,7 +164,7 @@ namespace core{
          * Dispatches a message to a particular consumer.
          * @param data - The message to be dispatched.
          */
-        virtual void dispatch( const Pointer<MessageDispatch>& data );
+        virtual void dispatch(const Pointer<MessageDispatch>& data);
 
     };
 

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQTransactionContext.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQTransactionContext.cpp?rev=1307147&r1=1307146&r2=1307147&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQTransactionContext.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQTransactionContext.cpp Thu Mar 29 22:27:32 2012
@@ -19,7 +19,7 @@
 #include <cms/Xid.h>
 #include <cms/XAException.h>
 #include <cms/TransactionInProgressException.h>
-#include <activemq/core/ActiveMQSession.h>
+#include <activemq/core/kernels/ActiveMQSessionKernel.h>
 #include <activemq/core/ActiveMQConnection.h>
 #include <activemq/core/ActiveMQConstants.h>
 #include <activemq/commands/TransactionInfo.h>
@@ -39,6 +39,7 @@ using namespace std;
 using namespace cms;
 using namespace activemq;
 using namespace activemq::core;
+using namespace activemq::core::kernels;
 using namespace activemq::commands;
 using namespace activemq::exceptions;
 using namespace activemq::util;
@@ -55,8 +56,8 @@ namespace core{
     class TxContextData {
     private:
 
-        TxContextData( const TxContextData& );
-        TxContextData& operator= ( const TxContextData& );
+        TxContextData(const TxContextData&);
+        TxContextData& operator=(const TxContextData&);
 
     public:
 
@@ -80,8 +81,8 @@ namespace {
     class Finally {
     private:
 
-        Finally( const Finally& );
-        Finally& operator= ( const Finally& );
+        Finally(const Finally&);
+        Finally& operator=(const Finally&);
 
     private:
 
@@ -89,11 +90,11 @@ namespace {
 
     public:
 
-        Finally( decaf::util::StlSet< Pointer<Synchronization> >* syncs ) : syncs( syncs ) {
+        Finally(decaf::util::StlSet<Pointer<Synchronization> >* syncs) : syncs(syncs) {
         }
 
         ~Finally() {
-            if( this->syncs != NULL ) {
+            if (this->syncs != NULL) {
                 this->syncs->clear();
             }
         }
@@ -102,12 +103,12 @@ namespace {
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-ActiveMQTransactionContext::ActiveMQTransactionContext( ActiveMQSession* session, const Properties& properties AMQCPP_UNUSED ) :
+ActiveMQTransactionContext::ActiveMQTransactionContext(ActiveMQSessionKernel* session, const Properties& properties AMQCPP_UNUSED) :
     context(new TxContextData()), session(session), connection(), synchronizations() {
 
     try {
 
-        if( session == NULL ) {
+        if (session == NULL) {
             throw NullPointerException(
                 __FILE__, __LINE__,
                 "ActiveMQTransactionContext::ActiveMQTransactionContext - "
@@ -130,18 +131,18 @@ ActiveMQTransactionContext::~ActiveMQTra
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void ActiveMQTransactionContext::addSynchronization( const Pointer<Synchronization>& sync ) {
+void ActiveMQTransactionContext::addSynchronization(const Pointer<Synchronization>& sync) {
 
-    synchronized( &this->synchronizations ) {
-        this->synchronizations.add( sync );
+    synchronized(&this->synchronizations) {
+        this->synchronizations.add(sync);
     }
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 void ActiveMQTransactionContext::removeSynchronization( const Pointer<Synchronization>& sync ) {
 
-    synchronized( &this->synchronizations ) {
-        this->synchronizations.remove( sync );
+    synchronized(&this->synchronizations) {
+        this->synchronizations.remove(sync);
     }
 }
 
@@ -150,31 +151,31 @@ void ActiveMQTransactionContext::begin()
 
     try{
 
-        if( isInXATransaction() ) {
+        if (isInXATransaction()) {
             throw cms::TransactionInProgressException(
                 "Cannot start a local transaction while an XA Transaction is in progress.");
         }
 
-        if( !isInTransaction() ) {
+        if (!isInTransaction()) {
 
-            synchronized( &this->synchronizations ) {
+            synchronized(&this->synchronizations) {
                 this->synchronizations.clear();
             }
 
             // Create the Id
-            Pointer<LocalTransactionId> id( new LocalTransactionId() );
-            id->setConnectionId( this->connection->getConnectionInfo().getConnectionId() );
-            id->setValue( this->connection->getNextLocalTransactionId() );
+            Pointer<LocalTransactionId> id(new LocalTransactionId());
+            id->setConnectionId(this->connection->getConnectionInfo().getConnectionId());
+            id->setValue(this->connection->getNextLocalTransactionId());
 
             // Create and Populate the Info Command.
-            Pointer<TransactionInfo> transactionInfo( new TransactionInfo() );
-            transactionInfo->setConnectionId( id->getConnectionId() );
-            transactionInfo->setTransactionId( id );
-            transactionInfo->setType( ActiveMQConstants::TRANSACTION_STATE_BEGIN );
+            Pointer<TransactionInfo> transactionInfo(new TransactionInfo());
+            transactionInfo->setConnectionId(id->getConnectionId());
+            transactionInfo->setTransactionId(id);
+            transactionInfo->setType(ActiveMQConstants::TRANSACTION_STATE_BEGIN);
 
-            this->connection->oneway( transactionInfo );
+            this->connection->oneway(transactionInfo);
 
-            this->context->transactionId = id.dynamicCast<TransactionId>();
+            this->context->transactionId = id.dynamicCast<TransactionId> ();
         }
     }
     AMQ_CATCH_RETHROW( ActiveMQException )
@@ -187,31 +188,28 @@ void ActiveMQTransactionContext::commit(
 
     try{
 
-        if( isInXATransaction() ) {
-            throw cms::TransactionInProgressException(
-                "Cannot Commit a local transaction while an XA Transaction is in progress.");
+        if (isInXATransaction()) {
+            throw cms::TransactionInProgressException("Cannot Commit a local transaction while an XA Transaction is in progress.");
         }
 
-        if( this->context->transactionId.get() == NULL ) {
-            throw InvalidStateException(
-                __FILE__, __LINE__,
-                "ActiveMQTransactionContext::commit - "
+        if (this->context->transactionId.get() == NULL) {
+            throw InvalidStateException(__FILE__, __LINE__, "ActiveMQTransactionContext::commit - "
                 "Commit called before transaction was started.");
         }
 
         this->beforeEnd();
 
         // Create and Populate the Info Command.
-        Pointer<TransactionInfo> info( new TransactionInfo() );
-        info->setConnectionId( this->connection->getConnectionInfo().getConnectionId() );
-        info->setTransactionId( this->context->transactionId );
-        info->setType( ActiveMQConstants::TRANSACTION_STATE_COMMITONEPHASE );
+        Pointer<TransactionInfo> info(new TransactionInfo());
+        info->setConnectionId(this->connection->getConnectionInfo().getConnectionId());
+        info->setTransactionId(this->context->transactionId);
+        info->setType(ActiveMQConstants::TRANSACTION_STATE_COMMITONEPHASE);
 
         // Before we send the command NULL the id in case of an exception.
-        this->context->transactionId.reset( NULL );
+        this->context->transactionId.reset(NULL);
 
         // Commit the current Transaction
-        this->connection->syncRequest( info );
+        this->connection->syncRequest(info);
 
         this->afterCommit();
     }
@@ -225,31 +223,28 @@ void ActiveMQTransactionContext::rollbac
 
     try{
 
-        if( isInXATransaction() ) {
-            throw cms::TransactionInProgressException(
-                "Cannot Rollback a local transaction while an XA Transaction is in progress.");
+        if (isInXATransaction()) {
+            throw cms::TransactionInProgressException("Cannot Rollback a local transaction while an XA Transaction is in progress.");
         }
 
-        if( this->context->transactionId == NULL ) {
-            throw InvalidStateException(
-                __FILE__, __LINE__,
-                "ActiveMQTransactionContext::rollback - "
+        if (this->context->transactionId == NULL) {
+            throw InvalidStateException(__FILE__, __LINE__, "ActiveMQTransactionContext::rollback - "
                 "Rollback called before transaction was started.");
         }
 
         this->beforeEnd();
 
         // Create and Populate the Info Command.
-        Pointer<TransactionInfo> info( new TransactionInfo() );
-        info->setConnectionId( this->connection->getConnectionInfo().getConnectionId() );
-        info->setTransactionId( this->context->transactionId );
-        info->setType( ActiveMQConstants::TRANSACTION_STATE_ROLLBACK );
+        Pointer<TransactionInfo> info(new TransactionInfo());
+        info->setConnectionId(this->connection->getConnectionInfo().getConnectionId());
+        info->setTransactionId(this->context->transactionId);
+        info->setType(ActiveMQConstants::TRANSACTION_STATE_ROLLBACK);
 
         // Before we send the command NULL the id in case of an exception.
-        this->context->transactionId.reset( NULL );
+        this->context->transactionId.reset(NULL);
 
         // Roll back the current Transaction
-        this->connection->syncRequest( info );
+        this->connection->syncRequest(info);
 
         this->afterRollback();
     }
@@ -262,12 +257,12 @@ void ActiveMQTransactionContext::rollbac
 void ActiveMQTransactionContext::beforeEnd() {
 
     // Notify each registered Synchronization that we are ending this Transaction.
-    synchronized( &this->synchronizations ) {
+    synchronized(&this->synchronizations) {
 
         std::auto_ptr<decaf::util::Iterator< Pointer<Synchronization> > > iter(
-            this->synchronizations.iterator() );
+            this->synchronizations.iterator());
 
-        while( iter->hasNext() ) {
+        while (iter->hasNext()) {
             iter->next()->beforeEnd();
         }
     }
@@ -277,14 +272,14 @@ void ActiveMQTransactionContext::beforeE
 void ActiveMQTransactionContext::afterCommit() {
 
     // Notify each registered Synchronization that we committed this Transaction.
-    synchronized( &this->synchronizations ) {
+    synchronized(&this->synchronizations) {
 
-        Finally finalizer( &this->synchronizations );
+        Finally finalizer(&this->synchronizations);
 
-        std::auto_ptr<decaf::util::Iterator< Pointer<Synchronization> > > iter(
-            this->synchronizations.iterator() );
+        std::auto_ptr<decaf::util::Iterator<Pointer<Synchronization> > > iter(
+            this->synchronizations.iterator());
 
-        while( iter->hasNext() ) {
+        while (iter->hasNext()) {
             iter->next()->afterCommit();
         }
     }
@@ -294,7 +289,7 @@ void ActiveMQTransactionContext::afterCo
 void ActiveMQTransactionContext::afterRollback() {
 
     // Notify each registered Synchronization that we rolled back this Transaction.
-    synchronized( &this->synchronizations ) {
+    synchronized(&this->synchronizations) {
 
         Finally finalizer( &this->synchronizations );
 
@@ -657,85 +652,85 @@ void ActiveMQTransactionContext::setXid(
     try {
         this->connection->checkClosedOrFailed();
         this->connection->ensureConnectionInfoSent();
-    } catch( Exception& e ) {
-        throw toXAException( e );
-    } catch( CMSException& e ) {
-        throw toXAException( e );
+    } catch (Exception& e) {
+        throw toXAException(e);
+    } catch (CMSException& e) {
+        throw toXAException(e);
     }
 
-    if( xid != NULL ) {
+    if (xid != NULL) {
 
         // Associate this new Xid with this Transaction as the root of the TX.
-        this->context->associatedXid.reset( xid->clone() );
-        this->context->transactionId.reset( new XATransactionId( xid ) );
+        this->context->associatedXid.reset(xid->clone());
+        this->context->transactionId.reset(new XATransactionId(xid));
 
-        Pointer<TransactionInfo> info( new TransactionInfo() );
-        info->setConnectionId( this->connection->getConnectionInfo().getConnectionId() );
-        info->setTransactionId( this->context->transactionId );
-        info->setType( ActiveMQConstants::TRANSACTION_STATE_BEGIN );
+        Pointer<TransactionInfo> info(new TransactionInfo());
+        info->setConnectionId(this->connection->getConnectionInfo().getConnectionId());
+        info->setTransactionId(this->context->transactionId);
+        info->setType(ActiveMQConstants::TRANSACTION_STATE_BEGIN);
 
         try {
-            this->connection->oneway( info );
-        } catch( Exception& e ) {
-            throw toXAException( e );
-        } catch( CMSException& e ) {
-            throw toXAException( e );
+            this->connection->oneway(info);
+        } catch (Exception& e) {
+            throw toXAException(e);
+        } catch (CMSException& e) {
+            throw toXAException(e);
         }
 
     } else {
 
-        if( this->context->transactionId != NULL ) {
+        if (this->context->transactionId != NULL) {
 
-            Pointer<TransactionInfo> info( new TransactionInfo() );
-            info->setConnectionId( this->connection->getConnectionInfo().getConnectionId() );
-            info->setTransactionId( this->context->transactionId );
-            info->setType( ActiveMQConstants::TRANSACTION_STATE_END );
+            Pointer<TransactionInfo> info(new TransactionInfo());
+            info->setConnectionId(this->connection->getConnectionInfo().getConnectionId());
+            info->setTransactionId(this->context->transactionId);
+            info->setType(ActiveMQConstants::TRANSACTION_STATE_END);
 
             try {
-                this->connection->syncRequest( info );
-            } catch( CMSException& e ) {
-                throw toXAException( e );
+                this->connection->syncRequest(info);
+            } catch (CMSException& e) {
+                throw toXAException(e);
             }
         }
 
         // remove the association currently in place.
-        this->context->associatedXid.reset( NULL );
-        this->context->transactionId.reset( NULL );
+        this->context->associatedXid.reset(NULL);
+        this->context->transactionId.reset(NULL);
     }
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 bool ActiveMQTransactionContext::equals( const cms::Xid* local, const cms::Xid* remote ) {
 
-    if( local == remote ) {
+    if (local == remote) {
         return true;
     }
 
-    if( ( local == NULL ) ^ ( remote == NULL ) ) {
+    if ((local == NULL) ^ (remote == NULL)) {
         return false;
     }
 
-    if( local->getFormatId() != remote->getFormatId() ) {
+    if (local->getFormatId() != remote->getFormatId()) {
         return false;
     } else {
 
-        std::vector<unsigned char> localBQual( Xid::MAXBQUALSIZE );
-        std::vector<unsigned char> remoteBQual( Xid::MAXBQUALSIZE );
+        std::vector<unsigned char> localBQual(Xid::MAXBQUALSIZE);
+        std::vector<unsigned char> remoteBQual(Xid::MAXBQUALSIZE);
 
-        local->getBranchQualifier( &localBQual[0], Xid::MAXBQUALSIZE );
-        remote->getBranchQualifier( &remoteBQual[0], Xid::MAXBQUALSIZE );
+        local->getBranchQualifier(&localBQual[0], Xid::MAXBQUALSIZE);
+        remote->getBranchQualifier(&remoteBQual[0], Xid::MAXBQUALSIZE);
 
-        if( localBQual != remoteBQual ) {
+        if (localBQual != remoteBQual) {
             return false;
         }
 
-        std::vector<unsigned char> localGTXID( Xid::MAXBQUALSIZE );
-        std::vector<unsigned char> remoteGTXID( Xid::MAXBQUALSIZE );
+        std::vector<unsigned char> localGTXID(Xid::MAXBQUALSIZE);
+        std::vector<unsigned char> remoteGTXID(Xid::MAXBQUALSIZE);
 
-        local->getGlobalTransactionId( &localGTXID[0], Xid::MAXGTRIDSIZE );
-        remote->getGlobalTransactionId( &remoteGTXID[0], Xid::MAXGTRIDSIZE );
+        local->getGlobalTransactionId(&localGTXID[0], Xid::MAXGTRIDSIZE);
+        remote->getGlobalTransactionId(&remoteGTXID[0], Xid::MAXGTRIDSIZE);
 
-        if( localGTXID != remoteGTXID ) {
+        if (localGTXID != remoteGTXID) {
             return false;
         }
     }
@@ -749,16 +744,16 @@ std::string ActiveMQTransactionContext::
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-XAException ActiveMQTransactionContext::toXAException( decaf::lang::Exception& ex ) {
-    CMSException cmsEx = CMSExceptionSupport::create( ex );
-    XAException xae( ex.getMessage(), &cmsEx );
-    xae.setErrorCode( XAException::XAER_RMFAIL );
+XAException ActiveMQTransactionContext::toXAException(decaf::lang::Exception& ex) {
+    CMSException cmsEx = CMSExceptionSupport::create(ex);
+    XAException xae(ex.getMessage(), &cmsEx);
+    xae.setErrorCode(XAException::XAER_RMFAIL);
     return xae;
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-XAException ActiveMQTransactionContext::toXAException( cms::CMSException& ex ) {
-    XAException xae( ex.getMessage(), &ex );
-    xae.setErrorCode( XAException::XAER_RMFAIL );
+XAException ActiveMQTransactionContext::toXAException(cms::CMSException& ex) {
+    XAException xae(ex.getMessage(), &ex);
+    xae.setErrorCode(XAException::XAER_RMFAIL);
     return xae;
 }

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQTransactionContext.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQTransactionContext.h?rev=1307147&r1=1307146&r2=1307147&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQTransactionContext.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQTransactionContext.h Thu Mar 29 22:27:32 2012
@@ -35,13 +35,15 @@
 #include <decaf/util/Properties.h>
 #include <decaf/util/concurrent/Mutex.h>
 
-namespace activemq{
-namespace core{
+namespace activemq {
+namespace core {
+namespace kernels {
+    class ActiveMQSessionKernel;
+}
 
     using decaf::lang::Pointer;
 
     class LocalTransactionEventListener;
-    class ActiveMQSession;
     class ActiveMQConnection;
     class TxContextData;
 
@@ -61,7 +63,7 @@ namespace core{
         TxContextData* context;
 
         // Session this Transaction is associated with
-        ActiveMQSession* session;
+        activemq::core::kernels::ActiveMQSessionKernel* session;
 
         // The Connection that is the parent of the Session.
         ActiveMQConnection* connection;
@@ -71,8 +73,8 @@ namespace core{
 
     private:
 
-        ActiveMQTransactionContext( const ActiveMQTransactionContext& );
-        ActiveMQTransactionContext& operator= ( const ActiveMQTransactionContext& );
+        ActiveMQTransactionContext(const ActiveMQTransactionContext&);
+        ActiveMQTransactionContext& operator=(const ActiveMQTransactionContext&);
 
     public:
 
@@ -84,8 +86,8 @@ namespace core{
          * @param properties
          *      Configuration parameters for this object
          */
-        ActiveMQTransactionContext( ActiveMQSession* session,
-                                    const decaf::util::Properties& properties );
+        ActiveMQTransactionContext(activemq::core::kernels::ActiveMQSessionKernel* session,
+                                   const decaf::util::Properties& properties);
 
         virtual ~ActiveMQTransactionContext();
 
@@ -93,13 +95,13 @@ namespace core{
          * Adds a Synchronization to this Transaction.
          * @param sync - The Synchronization instance to add.
          */
-        virtual void addSynchronization( const Pointer<Synchronization>& sync );
+        virtual void addSynchronization(const Pointer<Synchronization>& sync);
 
         /**
          * Removes a Synchronization to this Transaction.
          * @param sync - The Synchronization instance to add.
          */
-        virtual void removeSynchronization( const Pointer<Synchronization>& sync );
+        virtual void removeSynchronization(const Pointer<Synchronization>& sync);
 
         /**
          * Begins a new transaction if one is not currently in progress.
@@ -153,33 +155,33 @@ namespace core{
 
     public:  // XAResource implementation.
 
-        virtual void commit( const cms::Xid* xid, bool onePhase );
+        virtual void commit(const cms::Xid* xid, bool onePhase);
 
-        virtual void end( const cms::Xid* xid, int flags );
+        virtual void end(const cms::Xid* xid, int flags);
 
-        virtual void forget( const cms::Xid* xid );
+        virtual void forget(const cms::Xid* xid);
 
         virtual int getTransactionTimeout() const;
 
-        virtual bool isSameRM( const cms::XAResource* theXAResource );
+        virtual bool isSameRM(const cms::XAResource* theXAResource);
 
-        virtual int prepare( const cms::Xid* xid );
+        virtual int prepare(const cms::Xid* xid);
 
-        virtual int recover(int flag, cms::Xid** recovered );
+        virtual int recover(int flag, cms::Xid** recovered);
 
-        virtual void rollback( const cms::Xid* xid );
+        virtual void rollback(const cms::Xid* xid);
 
-        virtual bool setTransactionTimeout( int seconds );
+        virtual bool setTransactionTimeout(int seconds);
 
-        virtual void start( const cms::Xid* xid, int flags );
+        virtual void start(const cms::Xid* xid, int flags);
 
     private:
 
         std::string getResourceManagerId() const;
-        void setXid( const cms::Xid* xid );
-        bool equals( const cms::Xid* local, const cms::Xid* remote );
-        cms::XAException toXAException( cms::CMSException& ex );
-        cms::XAException toXAException( decaf::lang::Exception& ex );
+        void setXid(const cms::Xid* xid);
+        bool equals(const cms::Xid* local, const cms::Xid* remote);
+        cms::XAException toXAException(cms::CMSException& ex);
+        cms::XAException toXAException(decaf::lang::Exception& ex);
 
         void beforeEnd();
         void afterCommit();

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQXAConnection.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQXAConnection.cpp?rev=1307147&r1=1307146&r2=1307147&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQXAConnection.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQXAConnection.cpp Thu Mar 29 22:27:32 2012
@@ -18,40 +18,43 @@
 #include "ActiveMQXAConnection.h"
 
 #include <activemq/core/ActiveMQXASession.h>
+#include <activemq/core/kernels/ActiveMQXASessionKernel.h>
 #include <activemq/util/CMSExceptionSupport.h>
 
 using namespace activemq;
 using namespace activemq::core;
+using namespace activemq::core::kernels;
 using namespace activemq::exceptions;
 
 ////////////////////////////////////////////////////////////////////////////////
-ActiveMQXAConnection::ActiveMQXAConnection( const Pointer<transport::Transport>& transport,
-                                            const Pointer<decaf::util::Properties>& properties )
-  : ActiveMQConnection(transport, properties ) {
+ActiveMQXAConnection::ActiveMQXAConnection(const Pointer<transport::Transport>& transport,
+                                           const Pointer<decaf::util::Properties>& properties)
+  : ActiveMQConnection(transport, properties) {
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-ActiveMQXAConnection::~ActiveMQXAConnection() throw() {
+ActiveMQXAConnection::~ActiveMQXAConnection() {
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 cms::XASession* ActiveMQXAConnection::createXASession() {
-    return dynamic_cast<cms::XASession*>( this->createSession( cms::Session::SESSION_TRANSACTED ) );
+    return dynamic_cast<cms::XASession*>(this->createSession(cms::Session::SESSION_TRANSACTED));
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-cms::Session* ActiveMQXAConnection::createSession( cms::Session::AcknowledgeMode ackMode AMQCPP_UNUSED ) {
+cms::Session* ActiveMQXAConnection::createSession(cms::Session::AcknowledgeMode ackMode AMQCPP_UNUSED) {
 
     try {
 
         checkClosedOrFailed();
         ensureConnectionInfoSent();
 
-        // Create the session instance.
-        cms::Session* session = new ActiveMQXASession(
-            this, getNextSessionId(), this->getProperties() );
+        Pointer<ActiveMQXASessionKernel> session(
+            new ActiveMQXASessionKernel(this, getNextSessionId(), this->getProperties()));
 
-        return session;
+        this->addSession(session);
+
+        return new ActiveMQXASession(session);
     }
     AMQ_CATCH_ALL_THROW_CMSEXCEPTION()
 }

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQXAConnection.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQXAConnection.h?rev=1307147&r1=1307146&r2=1307147&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQXAConnection.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQXAConnection.h Thu Mar 29 22:27:32 2012
@@ -28,23 +28,22 @@ namespace core {
 
     using decaf::lang::Pointer;
 
-    class AMQCPP_API ActiveMQXAConnection : public cms::XAConnection,
-                                            public ActiveMQConnection {
+    class AMQCPP_API ActiveMQXAConnection : public cms::XAConnection, public ActiveMQConnection {
     private:
 
-        ActiveMQXAConnection( const ActiveMQXAConnection& );
-        ActiveMQXAConnection& operator= ( const ActiveMQXAConnection& );
+        ActiveMQXAConnection(const ActiveMQXAConnection&);
+        ActiveMQXAConnection& operator= (const ActiveMQXAConnection&);
 
     public:
 
-        ActiveMQXAConnection( const Pointer<transport::Transport>& transport,
-                              const Pointer<decaf::util::Properties>& properties );
+        ActiveMQXAConnection(const Pointer<transport::Transport>& transport,
+                             const Pointer<decaf::util::Properties>& properties);
 
-        virtual ~ActiveMQXAConnection() throw();
+        virtual ~ActiveMQXAConnection();
 
         virtual cms::XASession* createXASession();
 
-        virtual cms::Session* createSession( cms::Session::AcknowledgeMode ackMode );
+        virtual cms::Session* createSession(cms::Session::AcknowledgeMode ackMode);
 
     };
 

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQXASession.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQXASession.cpp?rev=1307147&r1=1307146&r2=1307147&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQXASession.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQXASession.cpp Thu Mar 29 22:27:32 2012
@@ -22,23 +22,22 @@
 
 using namespace activemq;
 using namespace activemq::core;
+using namespace activemq::core::kernels;
+using namespace decaf;
 using namespace decaf::lang;
 
 ////////////////////////////////////////////////////////////////////////////////
-ActiveMQXASession::ActiveMQXASession( ActiveMQConnection* connection,
-                                      const Pointer<commands::SessionId>& sessionId,
-                                      const decaf::util::Properties& properties ) :
-    ActiveMQSession( connection, sessionId, cms::Session::AUTO_ACKNOWLEDGE, properties ) {
-
+ActiveMQXASession::ActiveMQXASession(Pointer<ActiveMQXASessionKernel> kernel) :
+    ActiveMQSession(kernel), xaKernel(kernel) {
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-ActiveMQXASession::~ActiveMQXASession() throw() {
+ActiveMQXASession::~ActiveMQXASession() {
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 bool ActiveMQXASession::isTransacted() const {
-    return this->transaction->isInXATransaction();
+    return this->xaKernel->isTransacted();
 }
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -65,5 +64,5 @@ void ActiveMQXASession::rollback() {
 
 ////////////////////////////////////////////////////////////////////////////////
 cms::XAResource* ActiveMQXASession::getXAResource() const {
-    return this->transaction.get();
+    return this->xaKernel->getXAResource();
 }

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQXASession.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQXASession.h?rev=1307147&r1=1307146&r2=1307147&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQXASession.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/ActiveMQXASession.h Thu Mar 29 22:27:32 2012
@@ -22,21 +22,23 @@
 
 #include <cms/XASession.h>
 #include <activemq/core/ActiveMQSession.h>
+#include <activemq/core/kernels/ActiveMQXASessionKernel.h>
 
 namespace activemq {
 namespace core {
 
     using decaf::lang::Pointer;
 
-    class AMQCPP_API ActiveMQXASession : public cms::XASession,
-                                         public ActiveMQSession {
+    class AMQCPP_API ActiveMQXASession : public cms::XASession, public ActiveMQSession {
+    private:
+
+        Pointer<activemq::core::kernels::ActiveMQXASessionKernel> xaKernel;
+
     public:
 
-        ActiveMQXASession( ActiveMQConnection* connection,
-                           const Pointer<commands::SessionId>& sessionId,
-                           const decaf::util::Properties& properties );
+        ActiveMQXASession(Pointer<activemq::core::kernels::ActiveMQXASessionKernel> kernel);
 
-        virtual ~ActiveMQXASession() throw();
+        virtual ~ActiveMQXASession();
 
     public:  // Override ActiveMQSession methods to make them XA Aware
 

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp?rev=1307147&r1=1307146&r2=1307147&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.cpp Thu Mar 29 22:27:32 2012
@@ -43,6 +43,7 @@
 #include <activemq/core/FifoMessageDispatchChannel.h>
 #include <activemq/core/SimplePriorityMessageDispatchChannel.h>
 #include <activemq/core/RedeliveryPolicy.h>
+#include <activemq/core/kernels/ActiveMQSessionKernel.h>
 #include <activemq/threads/Scheduler.h>
 #include <cms/ExceptionListener.h>
 #include <memory>
@@ -195,23 +196,23 @@ namespace kernels {
     class ClientAckHandler : public ActiveMQAckHandler {
     private:
 
-        ActiveMQSession* session;
+        ActiveMQSessionKernel* session;
 
     private:
 
-        ClientAckHandler( const ClientAckHandler& );
-        ClientAckHandler& operator= ( const ClientAckHandler& );
+        ClientAckHandler(const ClientAckHandler&);
+        ClientAckHandler& operator=(const ClientAckHandler&);
 
     public:
 
-        ClientAckHandler( ActiveMQSession* session ) : session(session) {
+        ClientAckHandler( ActiveMQSessionKernel* session ) : session(session) {
             if( session == NULL ) {
                 throw NullPointerException(
                     __FILE__, __LINE__, "Ack Handler Created with NULL Session.");
             }
         }
 
-        void acknowledgeMessage( const commands::Message* message AMQCPP_UNUSED ) {
+        void acknowledgeMessage(const commands::Message* message AMQCPP_UNUSED ) {
 
             try {
                 this->session->acknowledge();
@@ -300,7 +301,7 @@ namespace kernels {
 }}}
 
 ////////////////////////////////////////////////////////////////////////////////
-ActiveMQConsumerKernel::ActiveMQConsumerKernel(ActiveMQSession* session,
+ActiveMQConsumerKernel::ActiveMQConsumerKernel(ActiveMQSessionKernel* session,
                                                const Pointer<ConsumerId>& id,
                                                const Pointer<ActiveMQDestination>& destination,
                                                const std::string& name,

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.h?rev=1307147&r1=1307146&r2=1307147&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQConsumerKernel.h Thu Mar 29 22:27:32 2012
@@ -37,12 +37,12 @@
 
 namespace activemq {
 namespace core {
-    class ActiveMQSession;
 namespace kernels {
 
     using decaf::lang::Pointer;
     using decaf::util::concurrent::atomic::AtomicBoolean;
 
+    class ActiveMQSessionKernel;
     class ActiveMQConsumerKernelConfig;
 
     class AMQCPP_API ActiveMQConsumerKernel : public cms::MessageConsumer, public Dispatcher {
@@ -56,7 +56,7 @@ namespace kernels {
         /**
          * The ActiveMQSession that owns this class instance.
          */
-        ActiveMQSession* session;
+        ActiveMQSessionKernel* session;
 
         /**
          * The ConsumerInfo object for this class instance.
@@ -65,12 +65,12 @@ namespace kernels {
 
     private:
 
-        ActiveMQConsumerKernel( const ActiveMQConsumerKernel& );
-        ActiveMQConsumerKernel& operator= ( const ActiveMQConsumerKernel& );
+        ActiveMQConsumerKernel(const ActiveMQConsumerKernel&);
+        ActiveMQConsumerKernel& operator=(const ActiveMQConsumerKernel&);
 
     public:
 
-        ActiveMQConsumerKernel(ActiveMQSession* session,
+        ActiveMQConsumerKernel(ActiveMQSessionKernel* session,
                                const Pointer<commands::ConsumerId>& id,
                                const Pointer<commands::ActiveMQDestination>& destination,
                                const std::string& name,
@@ -94,17 +94,17 @@ namespace kernels {
 
         virtual cms::Message* receive();
 
-        virtual cms::Message* receive( int millisecs );
+        virtual cms::Message* receive(int millisecs);
 
         virtual cms::Message* receiveNoWait();
 
-        virtual void setMessageListener( cms::MessageListener* listener );
+        virtual void setMessageListener(cms::MessageListener* listener);
 
         virtual cms::MessageListener* getMessageListener() const;
 
         virtual std::string getMessageSelector() const;
 
-        virtual void acknowledge( const Pointer<commands::MessageDispatch>& dispatch );
+        virtual void acknowledge(const Pointer<commands::MessageDispatch>& dispatch);
 
     public:  // Dispatcher Methods
 
@@ -174,7 +174,7 @@ namespace kernels {
          * Sets the Synchronization Registered state of this consumer.
          * @param value - true if registered false otherwise.
          */
-        void setSynchronizationRegistered( bool value );
+        void setSynchronizationRegistered(bool value);
 
         /**
          * Deliver any pending messages to the registered MessageListener if there
@@ -214,7 +214,7 @@ namespace kernels {
          * @param value
          *      The new value to assign to the Last Delivered Sequence Id property.
          */
-        void setLastDeliveredSequenceId( long long value );
+        void setLastDeliveredSequenceId(long long value);
 
         /**
          * @returns the number of Message's this consumer is waiting to Dispatch.
@@ -230,7 +230,7 @@ namespace kernels {
          * @param policy
          *      Pointer to a Redelivery Policy object that his Consumer will use.
          */
-        void setRedeliveryPolicy( RedeliveryPolicy* policy );
+        void setRedeliveryPolicy(RedeliveryPolicy* policy);
 
         /**
          * Gets a pointer to this Consumer's Redelivery Policy object, the Consumer
@@ -246,7 +246,7 @@ namespace kernels {
          * @param error
          *      The error that is to be thrown when a Receive call is made.
          */
-        void setFailureError( decaf::lang::Exception* error );
+        void setFailureError(decaf::lang::Exception* error);
 
         /**
          * Gets the error that caused this Consumer to be in a Failed state, or NULL if
@@ -273,28 +273,26 @@ namespace kernels {
          * @throws InvalidStateException if this consumer is closed upon
          *         entering this method.
          */
-        Pointer<MessageDispatch> dequeue( long long timeout );
+        Pointer<MessageDispatch> dequeue(long long timeout);
 
         /**
          * Pre-consume processing
          * @param dispatch - the message being consumed.
          */
-        void beforeMessageIsConsumed(
-            const Pointer<commands::MessageDispatch>& dispatch );
+        void beforeMessageIsConsumed(const Pointer<commands::MessageDispatch>& dispatch);
 
         /**
          * Post-consume processing
          * @param dispatch - the consumed message
          * @param messageExpired - flag indicating if the message has expired.
          */
-        void afterMessageIsConsumed(
-            const Pointer<commands::MessageDispatch>& dispatch, bool messageExpired );
+        void afterMessageIsConsumed(const Pointer<commands::MessageDispatch>& dispatch, bool messageExpired);
 
     private:
 
         // Using options from the Destination URI override any settings that are
         // defined for this consumer.
-        void applyDestinationOptions( const Pointer<commands::ConsumerInfo>& info );
+        void applyDestinationOptions(const Pointer<commands::ConsumerInfo>& info);
 
         // If supported sends a message pull request to the service provider asking
         // for the delivery of a new message.  This is used in the case where the
@@ -302,7 +300,7 @@ namespace kernels {
         // capable of delivering messages on a pull basis.  No request is made if
         // there are already messages in the unconsumed queue since there's no need
         // for a server round-trip in that instance.
-        void sendPullRequest( long long timeout );
+        void sendPullRequest(long long timeout);
 
         // Checks for the closed state and throws if so.
         void checkClosed() const;
@@ -310,10 +308,10 @@ namespace kernels {
         // Sends an ack as needed in order to keep them coming in if the current
         // ack mode allows the consumer to receive up to the prefetch limit before
         // an real ack is sent.
-        void ackLater( const Pointer<commands::MessageDispatch>& message, int ackType );
+        void ackLater(const Pointer<commands::MessageDispatch>& message, int ackType);
 
         // Create an Ack Message that acks all messages that have been delivered so far.
-        Pointer<commands::MessageAck> makeAckForAllDeliveredMessages( int type );
+        Pointer<commands::MessageAck> makeAckForAllDeliveredMessages(int type);
 
         // Should Acks be sent on each dispatched message
         bool isAutoAcknowledgeEach() const;

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQProducerKernel.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQProducerKernel.cpp?rev=1307147&r1=1307146&r2=1307147&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQProducerKernel.cpp (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQProducerKernel.cpp Thu Mar 29 22:27:32 2012
@@ -18,7 +18,7 @@
 #include "ActiveMQProducerKernel.h"
 
 #include <cms/Message.h>
-#include <activemq/core/ActiveMQSession.h>
+#include <activemq/core/kernels/ActiveMQSessionKernel.h>
 #include <activemq/core/ActiveMQConnection.h>
 #include <activemq/commands/RemoveInfo.h>
 #include <activemq/util/CMSExceptionSupport.h>
@@ -41,7 +41,7 @@ using namespace decaf::lang;
 using namespace decaf::lang::exceptions;
 
 ////////////////////////////////////////////////////////////////////////////////
-ActiveMQProducerKernel::ActiveMQProducerKernel(ActiveMQSession* session,
+ActiveMQProducerKernel::ActiveMQProducerKernel(ActiveMQSessionKernel* session,
                                                const Pointer<commands::ProducerId>& producerId,
                                                const Pointer<ActiveMQDestination>& destination,
                                                long long sendTimeout) : disableTimestamps(false),

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQProducerKernel.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQProducerKernel.h?rev=1307147&r1=1307146&r2=1307147&view=diff
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQProducerKernel.h (original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/core/kernels/ActiveMQProducerKernel.h Thu Mar 29 22:27:32 2012
@@ -33,11 +33,12 @@
 
 namespace activemq {
 namespace core {
-    class ActiveMQSession;
 namespace kernels {
 
     using decaf::lang::Pointer;
 
+    class ActiveMQSessionKernel;
+
     class AMQCPP_API ActiveMQProducerKernel : public cms::MessageProducer {
     private:
 
@@ -60,7 +61,7 @@ namespace kernels {
         long long sendTimeout;
 
         // Session that this producer sends to.
-        ActiveMQSession* session;
+        ActiveMQSessionKernel* session;
 
         // This Producers protocol specific info object
         Pointer<commands::ProducerInfo> producerInfo;
@@ -94,7 +95,7 @@ namespace kernels {
          * @param sendTimeout
          *        The configured send timeout for this Producer.
          */
-        ActiveMQProducerKernel(ActiveMQSession* session,
+        ActiveMQProducerKernel(ActiveMQSessionKernel* session,
                                const Pointer<commands::ProducerId>& producerId,
                                const Pointer<commands::ActiveMQDestination>& destination,
                                long long sendTimeout);



Mime
View raw message