activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nmitt...@apache.org
Subject svn commit: r418749 [7/17] - in /incubator/activemq/trunk/activemq-cpp: ./ src/ src/main/ src/main/activemq/ src/main/activemq/concurrent/ src/main/activemq/connector/ src/main/activemq/connector/openwire/ src/main/activemq/connector/stomp/ src/main/ac...
Date Mon, 03 Jul 2006 11:51:54 GMT
Added: incubator/activemq/trunk/activemq-cpp/src/main/activemq/core/ActiveMQTransaction.cpp
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/core/ActiveMQTransaction.cpp?rev=418749&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/core/ActiveMQTransaction.cpp (added)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/core/ActiveMQTransaction.cpp Mon Jul  3 04:51:36 2006
@@ -0,0 +1,343 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "ActiveMQTransaction.h"
+
+#include <activemq/exceptions/NullPointerException.h>
+#include <activemq/core/ActiveMQSession.h>
+#include <activemq/core/ActiveMQConnection.h>
+#include <activemq/core/ActiveMQConsumer.h>
+#include <activemq/core/ActiveMQMessage.h>
+#include <activemq/util/Integer.h>
+
+#include <activemq/concurrent/ThreadPool.h>
+
+using namespace std;
+using namespace cms;
+using namespace activemq;
+using namespace activemq::core;
+using namespace activemq::util;
+using namespace activemq::connector;
+using namespace activemq::concurrent;
+using namespace activemq::exceptions;
+
+////////////////////////////////////////////////////////////////////////////////
+ActiveMQTransaction::ActiveMQTransaction( ActiveMQConnection* connection,
+                                          ActiveMQSession* session,
+                                          const Properties& properties )
+{
+    try
+    {
+        if(connection == NULL || session == NULL)
+        {
+            throw NullPointerException(
+                __FILE__, __LINE__,
+                "ActiveMQTransaction::ActiveMQTransaction - "
+                "Initialized with a NULL connection data");
+        }
+    
+        // Store State Data
+        this->connection = connection;
+        this->session    = session;
+        this->taskCount  = 0;
+            
+        // convert from property Strings to int.
+        redeliveryDelay = Integer::parseInt( 
+            properties.getProperty("transaction.redeliveryDelay", "25") );
+        maxRedeliveries = Integer::parseInt( 
+            properties.getProperty("transaction.maxRedeliveryCount", "5") );
+
+        // Start a new Transaction
+        transactionInfo = connection->getConnectionData()->
+            getConnector()->startTransaction( session->getSessionInfo() );
+    }
+    AMQ_CATCH_RETHROW( ActiveMQException )
+    AMQ_CATCHALL_THROW( ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+ActiveMQTransaction::~ActiveMQTransaction(void)
+{
+    try
+    {
+        // Inform the connector we are rolling back before we close so that
+        // the provider knows we didn't complete this transaction
+        connection->getConnectionData()->getConnector()->
+            rollback(transactionInfo, session->getSessionInfo());
+
+        // Clean up
+        clearTransaction();
+        
+        // Must allow all the tasks to complete before we destruct otherwise
+        // the callbacks will cause an exception.
+        synchronized(&tasksDone)
+        {
+            while(taskCount != 0)
+            {
+                tasksDone.wait(1000);
+                
+                // TODO - Log Here to get some indication if we are stuck
+            }
+        }
+    }
+    AMQ_CATCH_NOTHROW( ActiveMQException )
+    AMQ_CATCHALL_NOTHROW( )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQTransaction::clearTransaction(void)
+{
+    try
+    {
+        if(transactionInfo != NULL)
+        {
+            // Dispose of the ProducerInfo
+            connection->getConnectionData()->
+                getConnector()->destroyResource(transactionInfo);
+        }
+
+        synchronized(&rollbackLock)
+        {
+            // If there are any messages that are being transacted, then 
+            // they die once and for all here.
+            RollbackMap::iterator itr = rollbackMap.begin();
+            
+            for(; itr != rollbackMap.end(); ++itr)
+            {
+                MessageList::iterator msgItr = itr->second.begin();
+                
+                for(; msgItr != itr->second.end(); ++msgItr)
+                {
+                   delete *msgItr;
+                }
+            }
+
+            rollbackMap.clear();
+        }
+    }
+    AMQ_CATCH_RETHROW( ActiveMQException )
+    AMQ_CATCHALL_THROW( ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQTransaction::addToTransaction( ActiveMQMessage* message,
+                                            ActiveMQMessageListener* listener )
+{
+    synchronized(&rollbackLock)
+    {
+        // Store in the Multi Map
+        rollbackMap[listener].push_back(message);
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQTransaction::removeFromTransaction(
+    ActiveMQMessageListener* listener )
+{
+    try
+    {
+        // Delete all the messages, then remove the consumer's entry from
+        // the Rollback Map.
+        synchronized(&rollbackLock)
+        {
+            RollbackMap::iterator rb_itr = rollbackMap.find( listener );
+            
+            if( rb_itr == rollbackMap.end() )
+            {
+                return;
+            }
+            
+            MessageList::iterator itr = rb_itr->second.begin();
+            
+            for(; itr != rollbackMap[listener].end(); ++itr)
+            {
+               delete *itr;
+            }
+            
+            // Erase the entry from the map
+            rollbackMap.erase(listener);
+        }
+    }
+    AMQ_CATCH_RETHROW( ActiveMQException )
+    AMQ_CATCHALL_THROW( ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQTransaction::commit(void) throw ( exceptions::ActiveMQException )
+{
+    try
+    {    
+        if(this->transactionInfo == NULL)
+        {
+            throw InvalidStateException(
+                __FILE__, __LINE__,
+                "ActiveMQTransaction::begin - "
+                "Commit called before transaction was started.");
+        }
+        
+        // Commit the current Transaction
+        connection->getConnectionData()->getConnector()->
+            commit( transactionInfo, session->getSessionInfo() );
+
+        // Clean out the Transaction
+        clearTransaction();
+
+        // Start a new Transaction
+        transactionInfo = connection->getConnectionData()->
+            getConnector()->startTransaction( session->getSessionInfo() );
+    }
+    AMQ_CATCH_RETHROW( ActiveMQException )
+    AMQ_CATCHALL_THROW( ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQTransaction::rollback(void) throw ( exceptions::ActiveMQException )
+{
+    try
+    {    
+        if(this->transactionInfo == NULL)
+        {
+            throw InvalidStateException(
+                __FILE__, __LINE__,
+                "ActiveMQTransaction::rollback - "
+                "Rollback called before transaction was started.");
+        }
+        
+        // Rollback the Transaction
+        connection->getConnectionData()->getConnector()->
+            rollback( transactionInfo, session->getSessionInfo() );
+
+        // Dispose of the ProducerInfo
+        connection->getConnectionData()->
+            getConnector()->destroyResource(transactionInfo);
+
+        // Start a new Transaction
+        transactionInfo = connection->getConnectionData()->
+            getConnector()->startTransaction( session->getSessionInfo() );
+
+        // Create a task for each consumer and copy its message list out
+        // to the Rollback task so we can clear the list for new messages
+        // that might come in next.
+        //  NOTE - This could be turned into a Thread so that the connection
+        //  doesn't have to wait on this method to complete an release its
+        //  mutex so it can dispatch new messages.  That would however requre
+        //  copying the whole map over to the thread.
+        synchronized(&rollbackLock)
+        {
+            RollbackMap::iterator itr = rollbackMap.begin();
+            
+            for(; itr != rollbackMap.end(); ++itr)
+            {
+                ThreadPool::getInstance()->queueTask(make_pair(
+                    new RollbackTask( itr->first,
+                                      connection,
+                                      session,
+                                      itr->second,
+                                      maxRedeliveries,
+                                      redeliveryDelay) , this));
+
+                // Count the tasks started.
+                taskCount++;
+
+            }
+            
+            // Clear the map.  Ownership of the messages is now handed off
+            // to the rollback tasks.
+            rollbackMap.clear();
+        }
+    }
+    AMQ_CATCH_RETHROW( ActiveMQException )
+    AMQ_CATCHALL_THROW( ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQTransaction::onTaskComplete( Runnable* task )
+{
+    try
+    {
+        // Delete the task
+        delete task;
+        
+        taskCount--;
+        
+        if(taskCount == 0)
+        {
+            synchronized(&tasksDone)
+            {
+                tasksDone.notifyAll();
+            }
+        }
+    }
+    AMQ_CATCH_NOTHROW( ActiveMQException )
+    AMQ_CATCHALL_NOTHROW( )
+}
+   
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQTransaction::onTaskException( Runnable* task, 
+                                           exceptions::ActiveMQException& ex )
+{
+    try
+    {
+        // Delegate
+        onTaskComplete(task);
+        
+        // Route the Error
+        ExceptionListener* listener = connection->getExceptionListener();
+        
+        if(listener != NULL)
+        {
+            listener->onException( ex );
+        }
+    }
+    AMQ_CATCH_NOTHROW( ActiveMQException )
+    AMQ_CATCHALL_NOTHROW( )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQTransaction::RollbackTask::run(void)
+{
+    try
+    {        
+        MessageList::iterator itr = messages.begin();
+
+        for(; itr != messages.end(); ++itr)
+        {
+            (*itr)->setRedeliveryCount((*itr)->getRedeliveryCount() + 1);
+            
+            // Redeliver Messages at some point in the future
+            Thread::sleep(redeliveryDelay);
+            
+            if((*itr)->getRedeliveryCount() >= maxRedeliveries)
+            {
+                // Poison Ack the Message, we give up processing this one
+                connection->getConnectionData()->getConnector()->
+                    acknowledge( 
+                        session->getSessionInfo(), 
+                        dynamic_cast< Message* >(*itr), 
+                        Connector::PoisonAck );
+
+                // Won't redeliver this so we kill it here.
+                delete *itr;
+                
+                return;
+            }
+            
+            listener->onActiveMQMessage(*itr);
+        }
+    }
+    AMQ_CATCH_RETHROW( ActiveMQException )
+    AMQ_CATCHALL_THROW( ActiveMQException )
+}

Added: incubator/activemq/trunk/activemq-cpp/src/main/activemq/core/ActiveMQTransaction.h
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/core/ActiveMQTransaction.h?rev=418749&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/core/ActiveMQTransaction.h (added)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/core/ActiveMQTransaction.h Mon Jul  3 04:51:36 2006
@@ -0,0 +1,283 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef _ACTIVEMQ_CORE_ACTIVEMQTRANSACTION_H_
+#define _ACTIVEMQ_CORE_ACTIVEMQTRANSACTION_H_
+
+#include <map>
+#include <list>
+
+#include <cms/Message.h>
+#include <cms/CMSException.h>
+
+#include <activemq/concurrent/Mutex.h>
+#include <activemq/concurrent/TaskListener.h>
+#include <activemq/concurrent/Runnable.h>
+#include <activemq/connector/TransactionInfo.h>
+#include <activemq/exceptions/InvalidStateException.h>
+#include <activemq/exceptions/IllegalArgumentException.h>
+#include <activemq/util/Properties.h>
+#include <activemq/core/ActiveMQSessionResource.h>
+
+namespace activemq{
+namespace core{
+
+    class ActiveMQConnection;
+    class ActiveMQSession;
+    class ActiveMQMessage;
+    class ActiveMQMessageListener;
+
+    /**
+     * Transaction Management class, hold messages that are to be redelivered
+     * upon a request to rollback.  The Tranasction represents an always
+     * running transaction, when it is committed or rolled back it silently
+     * creates a new transaction for the next set of messages.  The only
+     * way to permanently end this tranaction is to delete it.
+     * 
+     * Configuration options
+     * 
+     * transaction.redeliveryDelay
+     *   Wait time between the redelivery of each message
+     * 
+     * transaction.maxRedeliveryCount
+     *   Max number of times a message can be redelivered, if the session is 
+     *   rolled back more than this many time, the message is dropped.
+     */                        
+    class ActiveMQTransaction : public concurrent::TaskListener,
+                                public connector::TransactionInfo,
+                                public ActiveMQSessionResource
+    {
+    private:
+    
+        // List type for holding messages
+        typedef std::list< ActiveMQMessage* > MessageList;
+                
+        // Mapping of MessageListener Ids to Lists of Messages that are
+        // redelivered on a Rollback
+        typedef std::map< ActiveMQMessageListener*, MessageList > RollbackMap;
+       
+    private:
+    
+        // Connection this Transaction is associated with
+        ActiveMQConnection* connection;
+        
+        // Session this Transaction is associated with
+        ActiveMQSession* session;        
+        
+        // Transaction Info for the current Transaction
+        connector::TransactionInfo* transactionInfo;
+        
+        // Map of ActiveMQMessageListener to Messages to Rollback
+        RollbackMap rollbackMap;
+        
+        // Lock object to protect the rollback Map
+        concurrent::Mutex rollbackLock;
+        
+        // Max number of redeliveries before we quit
+        int maxRedeliveries;
+        
+        // Wait time between sends of message on a rollback
+        int redeliveryDelay;
+        
+        // Mutex that is signaled when all tasks complete.
+        concurrent::Mutex tasksDone;
+        
+        // Count of Tasks that are outstanding
+        int taskCount;
+
+    public:
+    
+        /**
+         * Constructor
+         */
+    	ActiveMQTransaction( ActiveMQConnection* connection,
+                             ActiveMQSession* session,
+                             const util::Properties& properties );
+    
+        /**
+         * Destructor
+         */
+    	virtual ~ActiveMQTransaction(void);
+                                  
+        /**
+         * Adds the Message as a part of the Transaction for the specified
+         * ActiveMQConsumer.
+         * @param ActiveMQMessage
+         * @param ActiveMQMessageListener
+         */
+        virtual void addToTransaction( ActiveMQMessage* message,
+                                       ActiveMQMessageListener* listener );
+                                      
+        /**
+         * Removes the ActiveMQMessageListener and all of its transacted 
+         * messages from the Transaction, this is usually only done when 
+         * a ActiveMQMessageListener is destroyed.
+         * @param consumer who is to be removed.
+         */
+        virtual void removeFromTransaction( ActiveMQMessageListener* listener );
+        
+        /**
+         * Commit the current Transaction
+         * @throw CMSException
+         */
+        virtual void commit(void) throw ( exceptions::ActiveMQException );
+        
+        /**
+         * Rollback the current Transaction
+         * @throw CMSException
+         */
+        virtual void rollback(void) throw ( exceptions::ActiveMQException );
+        
+        /**
+         * Get the Transaction Information object for the current 
+         * Transaction, returns NULL if no transaction is running
+         * @return TransactionInfo
+         */
+        virtual connector::TransactionInfo* getTransactionInfo(void) const {
+            return transactionInfo;
+        }
+
+    public:   // TransactionInfo Interface
+
+        /**
+         * Gets the Transction Id
+         * @return unsigned int Id
+         */
+        virtual unsigned int getTransactionId(void) const {
+            return transactionInfo->getTransactionId();
+        }
+
+        /**
+         * Sets the Transction Id
+         * @param unsigned int Id
+         */
+        virtual void setTransactionId( const unsigned int id ) {
+            transactionInfo->setTransactionId( id );
+        }
+
+        /**
+         * Gets the Session Info that this transaction is attached too
+         * @return SessionnInfo pointer
+         */
+        virtual const connector::SessionInfo* getSessionInfo(void) const {
+            return transactionInfo->getSessionInfo();
+        }
+
+        /**
+         * Gets the Session Info that this transaction is attached too
+         * @return SessionnInfo pointer
+         */
+        virtual void setSessionInfo( const connector::SessionInfo* session ) {
+            transactionInfo->setSessionInfo( session );
+        }
+
+    protected:   // Task Listener Interface
+    
+        /**
+         * Called when a queued task has completed, the task that
+         * finished is passed along for user consumption.  The task is
+         * deleted and the count of outstanding tasks is reduced.
+         * @param Runnable Pointer to the task that finished
+         */
+        virtual void onTaskComplete( concurrent::Runnable* task );
+           
+         /**
+          * Called when a queued task has thrown an exception while
+          * being run.  The Callee should assume that this was an 
+          * unrecoverable exeption and that this task is now defunct.
+          * Deletes the Task and notifies the connection that the
+          * exception has occurred.  Reduce the outstanding task count.
+          * @param Runnable Pointer to the task
+          * @param The ActiveMQException that was thrown.
+          */
+         virtual void onTaskException( concurrent::Runnable* task, 
+                                       exceptions::ActiveMQException& ex );
+
+    public:  // ActiveMQSessionResource
+    
+        /**
+         * Retrieve the Connector resource that is associated with
+         * this Session resource.
+         * @return pointer to a Connector Resource, can be NULL
+         */
+        virtual connector::ConnectorResource* getConnectorResource(void) {
+            return transactionInfo;
+        }
+
+    protected:
+    
+        /**
+         * Clean out all Messages from the Rollback Map, deleting the 
+         * messages as it goes.  Destroys the Transaction Info object as 
+         * well.
+         * @throw ActiveMQException
+         */
+        virtual void clearTransaction(void);
+
+    private:
+    
+        // Internal class that is used to redeliver one consumers worth
+        // of messages from this transaction.
+        class RollbackTask : public concurrent::Runnable
+        {
+        private:
+        
+            // Wait time before redelivery in millisecs
+            int redeliveryDelay;
+            
+            // Max number of time to redeliver this message
+            int maxRedeliveries;
+
+            // Messages to Redeliver
+            MessageList messages;
+
+            // Consumer we are redelivering to
+            ActiveMQMessageListener* listener;
+            
+            // Connection to use for sending message acks
+            ActiveMQConnection* connection;
+            
+            // Session for this Transaction
+            ActiveMQSession* session;
+
+        public:
+
+            RollbackTask( ActiveMQMessageListener* listener,
+                          ActiveMQConnection* connection,
+                          ActiveMQSession* session,
+                          MessageList& messages,
+                          int maxRedeliveries,
+                          int redeliveryDelay ){
+                            
+                // Store State Data.
+                this->messages        = messages;
+                this->listener        = listener;
+                this->redeliveryDelay = redeliveryDelay;
+                this->maxRedeliveries = maxRedeliveries;
+                this->session         = session;
+                this->connection      = connection;
+            }
+
+            // Dispatches the Messages to the Consumer.
+            virtual void run(void);
+
+        };
+
+    };
+
+}}
+
+#endif /*_ACTIVEMQ_CORE_ACTIVEMQTRANSACTION_H_*/

Added: incubator/activemq/trunk/activemq-cpp/src/main/activemq/exceptions/ActiveMQException.cpp
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/exceptions/ActiveMQException.cpp?rev=418749&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/exceptions/ActiveMQException.cpp (added)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/exceptions/ActiveMQException.cpp Mon Jul  3 04:51:36 2006
@@ -0,0 +1,70 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <stdio.h>
+#include "ActiveMQException.h"
+#include <activemq/logger/LoggerDefines.h>
+
+using namespace activemq;
+using namespace activemq::exceptions;
+using namespace std;
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQException::buildMessage(const char* format, va_list& vargs)
+{
+    // Allocate buffer with a guess of it's size
+    int size = 128;
+    
+    // Format string
+    while( true ){
+    	
+    	// Allocate a buffer of the specified size.
+    	char* buffer = new char[size];
+    	
+        int written = vsnprintf(buffer, size, format, vargs);
+        if (written > -1 && written < size-1) {
+            
+            // Guessed size was enough. Assign the string.
+            message.assign (buffer, written);
+            break;
+        }
+                
+        // Our buffer wasn't big enough - destroy the old buffer,
+        // double the size and try again.
+        delete [] buffer;
+        size *= 2;
+    }
+    
+    activemq::logger::SimpleLogger logger("com.yadda1");
+    logger.log( message );   
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQException::setMark( const char* file, const int lineNumber ){
+    
+    // Add this mark to the end of the stack trace.
+    stackTrace.push_back( std::make_pair( (std::string)file, (int)lineNumber ) );
+    
+    ostringstream stream;
+    stream << "\tFILE: " << stackTrace[stackTrace.size()-1].first;
+    stream << ", LINE: " << stackTrace[stackTrace.size()-1].second;
+                 
+    activemq::logger::SimpleLogger logger("com.yadda2");
+    logger.log( stream.str() );    
+}
+
+
+

Added: incubator/activemq/trunk/activemq-cpp/src/main/activemq/exceptions/ActiveMQException.h
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/exceptions/ActiveMQException.h?rev=418749&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/exceptions/ActiveMQException.h (added)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/exceptions/ActiveMQException.h Mon Jul  3 04:51:36 2006
@@ -0,0 +1,178 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef ACTIVEMQ_EXCEPTIONS_ACTIVEMQEXCEPTION_H
+#define ACTIVEMQ_EXCEPTIONS_ACTIVEMQEXCEPTION_H
+
+#include <cms/CMSException.h>
+#include <activemq/exceptions/ExceptionDefines.h>
+#include <stdarg.h>
+#include <sstream>
+
+namespace activemq{
+namespace exceptions{
+
+   /*
+    * Base class for all exceptions.
+    */
+   class ActiveMQException : public cms::CMSException
+   {
+   private:
+    
+      /**
+       * The cause of this exception.
+       */
+      std::string message;
+        
+      /**
+       * The stack trace.
+       */
+      std::vector< std::pair< std::string, int> > stackTrace;
+   
+   public:
+    
+      /**
+       * Default Constructor
+       */
+      ActiveMQException(void) {}
+       
+      /**
+       * Copy Constructor
+       */
+      ActiveMQException( const ActiveMQException& ex ){
+           *this = ex;
+      }
+       
+      /**
+       * Constructor - Initializes the file name and line number where
+       * this message occured.  Sets the message to report, using an 
+       * optional list of arguments to parse into the message
+       * @param file name where exception occurs
+       * @param line number where the exception occurred.
+       * @param message to report
+       * @param list of primitives that are formatted into the message
+       */
+      ActiveMQException(const char* file, const int lineNumber, 
+           const char* msg, ...)
+      {
+         va_list vargs ;
+         va_start(vargs, msg) ;
+         buildMessage(msg, vargs) ;
+            
+         // Set the first mark for this exception.
+         setMark( file, lineNumber );
+      }
+
+      /**
+       * Destructor
+       */
+      virtual ~ActiveMQException(){}
+   
+      /**
+       * Gets the message for this exception.
+       */
+      virtual const char* getMessage() const{ return message.c_str(); }
+   
+      /**
+       * Sets the cause for this exception.
+       * @param msg the format string for the msg.
+       */
+      virtual void setMessage( const char* msg, ... ){
+          va_list vargs ;
+          va_start(vargs, msg) ;
+          buildMessage(msg, vargs) ;
+      }
+        
+      /**
+       * Adds a file/line number to the stack trace.
+       * @param file The name of the file calling this method (use __FILE__).
+       * @param lineNumber The line number in the calling file (use __LINE__).
+       */
+      virtual void setMark( const char* file, const int lineNumber );
+        
+      /**
+       * Clones this exception.  This is useful for cases where you need
+       * to preserve the type of the original exception as well as the message.
+       * All subclasses should override.
+       */
+      virtual ActiveMQException* clone() const{
+          return new ActiveMQException( *this );
+      }
+        
+      /**
+       * Provides the stack trace for every point where
+       * this exception was caught, marked, and rethrown.  The first
+       * item in the returned vector is the first point where the mark
+       * was set (e.g. where the exception was created).  
+       * @return the stack trace.
+       */
+      virtual std::vector< std::pair< std::string, int> > getStackTrace() const{ 
+          return stackTrace; 
+      }
+        
+      /**
+       * Prints the stack trace to std::err
+       */
+      virtual void printStackTrace() const{
+          printStackTrace( std::cerr );
+      }
+        
+      /**
+       * Prints the stack trace to the given output stream.
+       * @param stream the target output stream.
+       */
+      virtual void printStackTrace( std::ostream& stream ) const{
+          stream << getStackTraceString();
+      }
+        
+      /**
+       * Gets the stack trace as one contiguous string.
+       */
+      virtual std::string getStackTraceString() const{
+            
+         // Create the output stream.
+         std::ostringstream stream;
+            
+         // Write the message and each stack entry.
+         stream << message << std::endl;
+         for( unsigned int ix=0; ix<stackTrace.size(); ++ix ){
+             stream << "\tFILE: " << stackTrace[ix].first;
+             stream << ", LINE: " << stackTrace[ix].second;
+             stream << std::endl;                    
+         }
+            
+         // Return the string from the output stream.
+         return stream.str();
+      }
+        
+      /**
+       * Assignment operator.
+       */
+      virtual ActiveMQException& operator =( const ActiveMQException& ex ){
+          this->message = ex.message;
+          this->stackTrace = ex.stackTrace;
+          return *this;
+      }
+        
+   protected:
+   
+      virtual void buildMessage(const char* format, va_list& vargs);
+
+   };
+
+}}
+
+#endif /*ACTIVEMQ_EXCEPTIONS_ACTIVEMQEXCEPTION_H*/

Added: incubator/activemq/trunk/activemq-cpp/src/main/activemq/exceptions/ExceptionDefines.h
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/exceptions/ExceptionDefines.h?rev=418749&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/exceptions/ExceptionDefines.h (added)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/exceptions/ExceptionDefines.h Mon Jul  3 04:51:36 2006
@@ -0,0 +1,78 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef _ACTIVEMQ_EXCEPTIONS_EXCEPTIONDEFINES_H_
+#define _ACTIVEMQ_EXCEPTIONS_EXCEPTIONDEFINES_H_
+
+/**
+ * Macro for catching and rethrowing an exception of
+ * a given type.
+ * @param type The type of the exception to throw 
+ * (e.g. ActiveMQException ).
+ */
+#define AMQ_CATCH_RETHROW( type ) \
+    catch( type& ex ){ \
+        ex.setMark( __FILE__, __LINE__ ); \
+        throw ex; \
+    }
+    
+/**
+ * Macro for catching an exception of one type and then rethrowing
+ * as another type.
+ * @param sourceType the type of the exception to be caught.
+ * @param targetType the type of the exception to be thrown.
+ */
+#define AMQ_CATCH_EXCEPTION_CONVERT( sourceType, targetType ) \
+    catch( sourceType& ex ){ \
+        targetType target( ex ); \
+        target.setMark( __FILE__, __LINE__ ); \
+        throw target; \
+    }
+
+/**
+ * A catch-all that throws a known exception.
+ * @param type the type of exception to be thrown.
+ */
+#define AMQ_CATCHALL_THROW( type ) \
+    catch( ... ){ \
+        type ex( __FILE__, __LINE__, \
+            "caught unknown exception" ); \
+        throw ex; \
+    }
+
+/**
+ * A catch-all that does not throw an exception, one use would
+ * be to catch any exception in a destructor and mark it, but not
+ * throw so that cleanup would continue as normal.
+ */
+#define AMQ_CATCHALL_NOTHROW( ) \
+    catch( ... ){ \
+        exceptions::ActiveMQException ex( __FILE__, __LINE__, \
+            "caught unknown exception, not rethrowing" ); \
+    }
+
+/**
+ * Macro for catching and rethrowing an exception of
+ * a given type.
+ * @param type The type of the exception to throw 
+ * (e.g. ActiveMQException ).
+ */
+#define AMQ_CATCH_NOTHROW( type ) \
+    catch( type& ex ){ \
+        ex.setMark( __FILE__, __LINE__ ); \
+    }
+
+#endif /*_ACTIVEMQ_EXCEPTIONS_EXCEPTIONDEFINES_H_*/

Added: incubator/activemq/trunk/activemq-cpp/src/main/activemq/exceptions/IllegalArgumentException.h
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/exceptions/IllegalArgumentException.h?rev=418749&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/exceptions/IllegalArgumentException.h (added)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/exceptions/IllegalArgumentException.h Mon Jul  3 04:51:36 2006
@@ -0,0 +1,90 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef ACTIVEMQ_EXCEPTIONS_ILLEGALARGUMENTEXCEPTION_H_
+#define ACTIVEMQ_EXCEPTIONS_ILLEGALARGUMENTEXCEPTION_H_
+
+#include <activemq/exceptions/ActiveMQException.h>
+
+namespace activemq{
+namespace exceptions{
+
+    /*
+     * Thrown when an illegal argument was passed into a method.
+     */
+    class IllegalArgumentException : public ActiveMQException
+    {
+    public:
+    
+      /**
+       * Default Constructor
+       */
+      IllegalArgumentException(){};
+      
+      /**
+       * Conversion Constructor from some other ActiveMQException
+       * @param An exception that should become this type of Exception
+       */
+      IllegalArgumentException( const ActiveMQException& ex ){
+         *(ActiveMQException*)this = ex;
+      }
+      
+      /**
+       * Copy Constructor
+       */
+      IllegalArgumentException( const IllegalArgumentException& ex ){
+         *(ActiveMQException*)this = ex;
+      }
+        
+      /**
+       * Constructor - Initializes the file name and line number where
+       * this message occured.  Sets the message to report, using an 
+       * optional list of arguments to parse into the message
+       * @param file name where exception occurs
+       * @param line number where the exception occurred.
+       * @param message to report
+       * @param list of primitives that are formatted into the message
+       */
+      IllegalArgumentException(const char* file, const int lineNumber,
+         const char* msg, ...)
+      {
+         va_list vargs ;
+         va_start(vargs, msg) ;
+         buildMessage(msg, vargs) ;
+
+         // Set the first mark for this exception.
+         setMark( file, lineNumber );
+      }
+        
+      /**
+       * Clones this exception.  This is useful for cases where you need
+       * to preserve the type of the original exception as well as the message.
+       * All subclasses should override.
+       */
+      virtual ActiveMQException* clone() const{
+         return new IllegalArgumentException( *this );
+      }
+      
+      /**
+       * Destructor
+       */
+      virtual ~IllegalArgumentException(){}
+        
+    };
+
+}}
+
+#endif /*ACTIVEMQ_EXCEPTIONS_ILLEGALARGUMENTEXCEPTION_H_*/

Added: incubator/activemq/trunk/activemq-cpp/src/main/activemq/exceptions/IllegalMonitorStateException.h
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/exceptions/IllegalMonitorStateException.h?rev=418749&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/exceptions/IllegalMonitorStateException.h (added)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/exceptions/IllegalMonitorStateException.h Mon Jul  3 04:51:36 2006
@@ -0,0 +1,92 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef ACTIVEMQ_EXCEPTIONS_ILLEGALMONITORSTATEEXCEPTION_H_
+#define ACTIVEMQ_EXCEPTIONS_ILLEGALMONITORSTATEEXCEPTION_H_
+
+#include <activemq/exceptions/ActiveMQException.h>
+
+namespace activemq{
+namespace exceptions{
+
+   /*
+    * Thrown when an error occurs from calling a method from syncronizable
+    * and the caller doesn't hold a lock on the object.
+    */
+   class IllegalMonitorStateException : public ActiveMQException
+   {
+   public:
+
+      /**
+       * Default Constructor
+       */
+      IllegalMonitorStateException(void) {};
+
+      /**
+       * Conversion Constructor from some other ActiveMQException
+       * @param An exception that should become this type of Exception
+       */
+      IllegalMonitorStateException(const ActiveMQException& ex){
+         *(ActiveMQException*)this = ex;
+      }
+
+      /**
+       * Copy Constructor
+       */
+      IllegalMonitorStateException(const IllegalMonitorStateException& ex){
+         *(ActiveMQException*)this = ex;
+      }
+
+      /**
+       * Constructor - Initializes the file name and line number where
+       * this message occured.  Sets the message to report, using an 
+       * optional list of arguments to parse into the message
+       * @param file name where exception occurs
+       * @param line number where the exception occurred.
+       * @param message to report
+       * @param list of primitives that are formatted into the message
+       */
+      IllegalMonitorStateException(const char* file, 
+                                   const int lineNumber,
+                                   const char* msg, ...)
+      {
+         va_list vargs;
+         va_start(vargs, msg);
+         buildMessage(msg, vargs);
+            
+         // Set the first mark for this exception.
+         setMark(file, lineNumber);
+      }
+
+      /**
+       * Clones this exception.  This is useful for cases where you need
+       * to preserve the type of the original exception as well as the message.
+       * All subclasses should override.
+       */
+      virtual ActiveMQException* clone(void) const{
+         return new IllegalMonitorStateException(*this);
+      }
+
+      /**
+       * Destructor
+       */
+      virtual ~IllegalMonitorStateException(void) {}
+
+   };
+
+}}
+
+#endif /*ACTIVEMQ_EXCEPTIONS_ILLEGALMONITORSTATEEXCEPTION_H_*/

Added: incubator/activemq/trunk/activemq-cpp/src/main/activemq/exceptions/InterruptedException.h
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/exceptions/InterruptedException.h?rev=418749&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/exceptions/InterruptedException.h (added)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/exceptions/InterruptedException.h Mon Jul  3 04:51:36 2006
@@ -0,0 +1,91 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef ACTIVEMQ_EXCEPTIONS_INTERRUPTEDENTEXCEPTION_H_
+#define ACTIVEMQ_EXCEPTIONS_INTERRUPTEDENTEXCEPTION_H_
+
+#include <activemq/exceptions/ActiveMQException.h>
+
+namespace activemq{
+namespace exceptions{
+
+   /*
+    * Thrown when an Thread is interrupted during a wait.
+    */
+   class InterruptedException : public ActiveMQException
+   {
+   public:
+
+      /**
+       * Default Constructor
+       */
+      InterruptedException(void) {};
+        
+      /**
+       * Conversion Constructor from some other ActiveMQException
+       * @param An exception that should become this type of Exception
+       */
+      InterruptedException(const ActiveMQException& ex){
+         *(ActiveMQException*)this = ex;
+      }
+
+      /**
+       * Copy Constructor
+       */
+      InterruptedException(const InterruptedException& ex){
+         *(ActiveMQException*)this = ex;
+      }
+
+      /**
+       * Constructor - Initializes the file name and line number where
+       * this message occured.  Sets the message to report, using an 
+       * optional list of arguments to parse into the message
+       * @param file name where exception occurs
+       * @param line number where the exception occurred.
+       * @param message to report
+       * @param list of primitives that are formatted into the message
+       */
+      InterruptedException(const char* file, 
+                           const int lineNumber,
+                           const char* msg, ...)
+      {
+         va_list vargs;
+         va_start(vargs, msg);
+         buildMessage(msg, vargs);
+            
+         // Set the first mark for this exception.
+         setMark(file, lineNumber);
+      }
+
+      /**
+       * Clones this exception.  This is useful for cases where you need
+       * to preserve the type of the original exception as well as the message.
+       * All subclasses should override.
+       */
+      virtual ActiveMQException* clone(void) const{
+         return new InterruptedException(*this);
+      }
+
+      /**
+       * Destructor
+       */
+      virtual ~InterruptedException(void) {}
+
+   };
+
+}}
+
+#endif /*ACTIVEMQ_EXCEPTIONS_INTERRUPTEDENTEXCEPTION_H_*/

Added: incubator/activemq/trunk/activemq-cpp/src/main/activemq/exceptions/InvalidStateException.h
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/exceptions/InvalidStateException.h?rev=418749&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/exceptions/InvalidStateException.h (added)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/exceptions/InvalidStateException.h Mon Jul  3 04:51:36 2006
@@ -0,0 +1,92 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef _ACTIVEMQ_EXCEPTIONS_INVALIDSTATEEXCEPTION_H_
+#define _ACTIVEMQ_EXCEPTIONS_INVALIDSTATEEXCEPTION_H_
+
+#include <activemq/exceptions/ActiveMQException.h>
+
+namespace activemq{
+namespace exceptions{
+
+   /*
+    * Thrown when an operation is requested, but the state of the object
+    * servicing the request is not correct for that request.
+    */
+   class InvalidStateException : public ActiveMQException
+   {
+   public:
+
+      /**
+       * Default Constructor
+       */
+   	  InvalidStateException(void) {}
+
+      /**
+       * Conversion Constructor from some other ActiveMQException
+       * @param An exception that should become this type of Exception
+       */
+      InvalidStateException(const ActiveMQException& ex){
+         *(ActiveMQException*)this = ex;
+      }
+
+      /**
+       * Copy Constructor
+       */
+      InvalidStateException(const InvalidStateException& ex){
+         *(ActiveMQException*)this = ex;
+      }
+
+      /**
+       * Constructor - Initializes the file name and line number where
+       * this message occured.  Sets the message to report, using an 
+       * optional list of arguments to parse into the message
+       * @param file name where exception occurs
+       * @param line number where the exception occurred.
+       * @param message to report
+       * @param list of primitives that are formatted into the message
+       */
+      InvalidStateException(const char* file, 
+                            const int lineNumber,
+                            const char* msg, ...)
+      {
+         va_list vargs;
+         va_start(vargs, msg);
+         buildMessage(msg, vargs);
+            
+         // Set the first mark for this exception.
+         setMark(file, lineNumber);
+      }
+
+      /**
+       * Clones this exception.  This is useful for cases where you need
+       * to preserve the type of the original exception as well as the message.
+       * All subclasses should override.
+       */
+      virtual ActiveMQException* clone(void) const{
+         return new InvalidStateException(*this);
+      }
+
+      /**
+       * Destructor
+       */
+   	virtual ~InvalidStateException(void) {}
+      
+   };
+
+}}
+
+#endif /*_ACTIVEMQ_EXCEPTIONS_INVALIDSTATEEXCEPTION_H_*/

Added: incubator/activemq/trunk/activemq-cpp/src/main/activemq/exceptions/NoSuchElementException.h
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/exceptions/NoSuchElementException.h?rev=418749&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/exceptions/NoSuchElementException.h (added)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/exceptions/NoSuchElementException.h Mon Jul  3 04:51:36 2006
@@ -0,0 +1,92 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef ACTIVEMQ_EXCEPTIONS_NOSUCHELEMENTEXCEPTION_H_
+#define ACTIVEMQ_EXCEPTIONS_NOSUCHELEMENTEXCEPTION_H_
+
+#include <activemq/exceptions/ActiveMQException.h>
+
+namespace activemq{
+namespace exceptions{
+
+   /*
+    * Thrown from an operation that attempts to access some element that does
+    * not exist.
+    */
+   class NoSuchElementException : public ActiveMQException
+   {
+   public:
+
+      /**
+       * Default Constructor
+       */
+      NoSuchElementException(void) {};
+
+      /**
+       * Conversion Constructor from some other ActiveMQException
+       * @param An exception that should become this type of Exception
+       */
+      NoSuchElementException(const ActiveMQException& ex){
+         *(ActiveMQException*)this = ex;
+      }
+
+      /**
+       * Copy Constructor
+       */
+      NoSuchElementException(const NoSuchElementException& ex){
+         *(ActiveMQException*)this = ex;
+      }
+
+      /**
+       * Constructor - Initializes the file name and line number where
+       * this message occured.  Sets the message to report, using an 
+       * optional list of arguments to parse into the message
+       * @param file name where exception occurs
+       * @param line number where the exception occurred.
+       * @param message to report
+       * @param list of primitives that are formatted into the message
+       */
+      NoSuchElementException(const char* file, 
+                             const int lineNumber,
+                             const char* msg, ...)
+      {
+         va_list vargs;
+         va_start(vargs, msg);
+         buildMessage(msg, vargs);
+            
+         // Set the first mark for this exception.
+         setMark(file, lineNumber);
+      }
+
+      /**
+       * Clones this exception.  This is useful for cases where you need
+       * to preserve the type of the original exception as well as the message.
+       * All subclasses should override.
+       */
+      virtual ActiveMQException* clone(void) const{
+         return new NoSuchElementException(*this);
+      }
+
+      /**
+       * Destructor
+       */
+      virtual ~NoSuchElementException(void) {}
+
+   };
+
+}}
+
+#endif /*ACTIVEMQ_EXCEPTIONS_NOSUCHELEMENTEXCEPTION_H_*/

Added: incubator/activemq/trunk/activemq-cpp/src/main/activemq/exceptions/NullPointerException.h
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/exceptions/NullPointerException.h?rev=418749&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/exceptions/NullPointerException.h (added)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/exceptions/NullPointerException.h Mon Jul  3 04:51:36 2006
@@ -0,0 +1,91 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef ACTIVEMQ_EXCEPTIONS_NULLPOINTERENTEXCEPTION_H_
+#define ACTIVEMQ_EXCEPTIONS_NULLPOINTERENTEXCEPTION_H_
+
+#include <activemq/exceptions/ActiveMQException.h>
+
+namespace activemq{
+namespace exceptions{
+
+   /*
+    * Thrown when an error occurs that involves a pointer being NULL
+    */
+   class NullPointerException : public ActiveMQException
+   {
+   public:
+
+      /**
+       * Default Constructor
+       */
+      NullPointerException(void) {};
+
+      /**
+       * Conversion Constructor from some other ActiveMQException
+       * @param An exception that should become this type of Exception
+       */
+      NullPointerException(const ActiveMQException& ex){
+         *(ActiveMQException*)this = ex;
+      }
+
+      /**
+       * Copy Constructor
+       */
+      NullPointerException(const NullPointerException& ex){
+         *(ActiveMQException*)this = ex;
+      }
+
+      /**
+       * Constructor - Initializes the file name and line number where
+       * this message occured.  Sets the message to report, using an 
+       * optional list of arguments to parse into the message
+       * @param file name where exception occurs
+       * @param line number where the exception occurred.
+       * @param message to report
+       * @param list of primitives that are formatted into the message
+       */
+      NullPointerException(const char* file, 
+                           const int lineNumber,
+                           const char* msg, ...)
+      {
+         va_list vargs;
+         va_start(vargs, msg);
+         buildMessage(msg, vargs);
+            
+         // Set the first mark for this exception.
+         setMark(file, lineNumber);
+      }
+
+      /**
+       * Clones this exception.  This is useful for cases where you need
+       * to preserve the type of the original exception as well as the message.
+       * All subclasses should override.
+       */
+      virtual ActiveMQException* clone(void) const{
+         return new NullPointerException(*this);
+      }
+
+      /**
+       * Destructor
+       */
+      virtual ~NullPointerException(void) {}
+
+   };
+
+}}
+
+#endif /*ACTIVEMQ_EXCEPTIONS_NULLPOINTERENTEXCEPTION_H_*/

Added: incubator/activemq/trunk/activemq-cpp/src/main/activemq/exceptions/RuntimeException.h
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/exceptions/RuntimeException.h?rev=418749&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/exceptions/RuntimeException.h (added)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/exceptions/RuntimeException.h Mon Jul  3 04:51:36 2006
@@ -0,0 +1,93 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef ACTIVEMQ_EXCEPTIONS_RUNTIMEENTEXCEPTION_H_
+#define ACTIVEMQ_EXCEPTIONS_RUNTIMEENTEXCEPTION_H_
+
+#include <activemq/exceptions/ActiveMQException.h>
+
+namespace activemq{
+namespace exceptions{
+
+   /*
+    * Thrown when an error occurs that involves something in the run time
+    * This could be a memory allocation exception or some other generally
+    * unrecoverable exception.
+    */
+   class RuntimeException : public ActiveMQException
+   {
+   public:
+
+      /**
+       * Default Constructor
+       */
+      RuntimeException(void) {};
+
+      /**
+       * Conversion Constructor from some other ActiveMQException
+       * @param An exception that should become this type of Exception
+       */
+      RuntimeException(const ActiveMQException& ex){
+         *(ActiveMQException*)this = ex;
+      }
+
+      /**
+       * Copy Constructor
+       */
+      RuntimeException(const RuntimeException& ex){
+         *(ActiveMQException*)this = ex;
+      }
+
+      /**
+       * Constructor - Initializes the file name and line number where
+       * this message occured.  Sets the message to report, using an 
+       * optional list of arguments to parse into the message
+       * @param file name where exception occurs
+       * @param line number where the exception occurred.
+       * @param message to report
+       * @param list of primitives that are formatted into the message
+       */
+      RuntimeException(const char* file, 
+                       const int lineNumber,
+                       const char* msg, ...)
+      {
+         va_list vargs;
+         va_start(vargs, msg);
+         buildMessage(msg, vargs);
+            
+         // Set the first mark for this exception.
+         setMark(file, lineNumber);
+      }
+
+      /**
+       * Clones this exception.  This is useful for cases where you need
+       * to preserve the type of the original exception as well as the message.
+       * All subclasses should override.
+       */
+      virtual ActiveMQException* clone(void) const{
+         return new RuntimeException(*this);
+      }
+
+      /**
+       * Destructor
+       */
+      virtual ~RuntimeException(void) {}
+
+   };
+
+}}
+
+#endif /*ACTIVEMQ_EXCEPTIONS_RUNTIMEENTEXCEPTION_H_*/

Added: incubator/activemq/trunk/activemq-cpp/src/main/activemq/exceptions/UnsupportedOperationException.h
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/exceptions/UnsupportedOperationException.h?rev=418749&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/exceptions/UnsupportedOperationException.h (added)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/exceptions/UnsupportedOperationException.h Mon Jul  3 04:51:36 2006
@@ -0,0 +1,90 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef ACTIVEMQ_EXCEPTIONS_UNSUPPORTEDOPERATIONEXCEPTION_H_
+#define ACTIVEMQ_EXCEPTIONS_UNSUPPORTEDOPERATIONEXCEPTION_H_
+
+#include <activemq/exceptions/ActiveMQException.h>
+
+namespace activemq{
+namespace exceptions{
+
+   /*
+    * Thrown when an unsupported method is called.
+    */
+   class UnsupportedOperationException : public ActiveMQException
+   {
+   public:
+   
+      /**
+       * Default Constructor
+       */
+      UnsupportedOperationException(void) {};
+      
+      /**
+       * Conversion Constructor from some other ActiveMQException
+       * @param An exception that should become this type of Exception
+       */
+      UnsupportedOperationException( const ActiveMQException& ex ){
+         *(ActiveMQException*)this = ex;
+      }
+
+      /**
+       * Copy Constructor
+       */
+      UnsupportedOperationException( const UnsupportedOperationException& ex ){
+         *(ActiveMQException*)this = ex;
+      }
+
+      /**
+       * Constructor - Initializes the file name and line number where
+       * this message occured.  Sets the message to report, using an 
+       * optional list of arguments to parse into the message
+       * @param file name where exception occurs
+       * @param line number where the exception occurred.
+       * @param message to report
+       * @param list of primitives that are formatted into the message
+       */
+      UnsupportedOperationException(const char* file, const int lineNumber,
+         const char* msg, ...)
+      {
+         va_list vargs ;
+         va_start(vargs, msg) ;
+         buildMessage(msg, vargs) ;
+            
+         // Set the first mark for this exception.
+         setMark( file, lineNumber );
+      }
+        
+      /**
+       * Clones this exception.  This is useful for cases where you need
+       * to preserve the type of the original exception as well as the message.
+       * All subclasses should override.
+       */
+      virtual ActiveMQException* clone() const{
+         return new UnsupportedOperationException( *this );
+      }
+
+      /**
+       * Destructor
+       */
+      virtual ~UnsupportedOperationException(){}
+        
+   };
+
+}}
+
+#endif /*ACTIVEMQ_EXCEPTIONS_UNSUPPORTEDOPERATIONEXCEPTION_H_*/

Added: incubator/activemq/trunk/activemq-cpp/src/main/activemq/io/BufferedInputStream.cpp
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/io/BufferedInputStream.cpp?rev=418749&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/io/BufferedInputStream.cpp (added)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/io/BufferedInputStream.cpp Mon Jul  3 04:51:36 2006
@@ -0,0 +1,145 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+#include "BufferedInputStream.h"
+#include <algorithm>
+
+using namespace activemq::io;
+using namespace std;
+
+////////////////////////////////////////////////////////////////////////////////
+BufferedInputStream::BufferedInputStream( InputStream* stream )
+{
+	// Default to a 1k buffer.
+	init( stream, 1024 );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+BufferedInputStream::BufferedInputStream( InputStream* stream, 
+	const int bufferSize )
+{
+	init( stream, bufferSize );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+BufferedInputStream::~BufferedInputStream()
+{
+    // Destroy the buffer.
+    if( buffer != NULL ){
+        delete [] buffer;
+        buffer = NULL;
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void BufferedInputStream::init( InputStream* stream, const int bufferSize ){
+	
+	this->stream = stream;
+	this->bufferSize = bufferSize;
+	
+	// Create the buffer and initialize the head and tail positions.
+	buffer = new unsigned char[bufferSize];
+	head = 0;
+	tail = 0;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void BufferedInputStream::close() throw(cms::CMSException){
+	
+	// Close the delegate stream.
+	stream->close();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+unsigned char BufferedInputStream::read() throw (IOException){
+	
+	// If we don't have any data buffered yet - read as much as we can.	
+	if( tail == head ){
+		bufferData();
+	}
+	
+	// Get the next character.
+	char returnValue = buffer[head++];
+	
+	// If the buffer is now empty - reset it to the beginning of the buffer.
+	if( tail == head ){
+		tail = head = 0;
+	}
+	
+	return returnValue;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+int BufferedInputStream::read( unsigned char* buffer, 
+	const int bufferSize ) throw (IOException){
+	
+	// If we still haven't filled the output buffer AND there is data
+	// on the input stream to be read, read a buffer's
+	// worth from the stream.
+	int totalRead = 0;
+	while( totalRead < bufferSize ){		
+		
+		// Get the remaining bytes to copy.
+		int bytesToCopy = min( tail-head, (bufferSize-totalRead) );
+		
+		// Copy the data to the output buffer.	
+		memcpy( buffer+totalRead, this->buffer+head, bytesToCopy );
+		
+		// Increment the total bytes read.
+		totalRead += bytesToCopy;
+		
+		// Increment the head position.  If the buffer is now empty,
+		// reset the positions and buffer more data.
+		head += bytesToCopy;
+		if( head == tail ){
+			
+			// Reset the buffer indicies.
+			head = tail = 0;
+			
+			// If there is no more data currently available on the 
+			// input stream, stop the loop.
+			if( stream->available() == 0 ){
+				break;
+			}
+			
+			// Buffer as much data as we can.
+			bufferData();
+		}				
+	}
+	
+	// Return the total number of bytes read.
+	return totalRead;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void BufferedInputStream::bufferData() throw (IOException){
+	
+	if( tail == bufferSize ){
+		throw IOException( __FILE__, __LINE__, 
+            "BufferedInputStream::bufferData - buffer full" );
+	}
+	
+	// Read in as many bytes as we can.
+	int bytesRead = stream->read( buffer+tail, bufferSize-tail );
+	if( bytesRead == 0 ){
+		throw IOException( __FILE__, __LINE__, 
+            "BufferedInputStream::read() - failed reading bytes from stream");
+	}
+	
+	// Increment the tail to the new end position.
+	tail += bytesRead;
+}

Added: incubator/activemq/trunk/activemq-cpp/src/main/activemq/io/BufferedInputStream.h
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/io/BufferedInputStream.h?rev=418749&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/io/BufferedInputStream.h (added)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/io/BufferedInputStream.h Mon Jul  3 04:51:36 2006
@@ -0,0 +1,191 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ACTIVEMQ_IO_BUFFEREDINPUTSTREAM_H_
+#define ACTIVEMQ_IO_BUFFEREDINPUTSTREAM_H_
+ 
+#include <activemq/io/InputStream.h>
+#include <assert.h>
+
+namespace activemq{
+namespace io{
+      
+   /**
+    * A wrapper around another input stream that performs
+    * a buffered read, where it reads more data than it needs
+    * in order to reduce the number of io operations on the
+    * input stream.
+    */
+   class BufferedInputStream : public InputStream
+   {
+   private:
+   
+      /**
+       * The target input stream.
+       */
+      InputStream* stream;
+      
+      /**
+       * The internal buffer.
+       */
+      unsigned char* buffer;
+      
+      /**
+       * The buffer size.
+       */
+      int bufferSize;
+      
+      /**
+       * The current head of the buffer.
+       */
+      int head;
+      
+      /**
+       * The current tail of the buffer.
+       */
+      int tail;
+      
+   public:
+   
+      /**
+       * Constructor
+       * @param stream The target input stream.
+       */
+      BufferedInputStream( InputStream* stream );
+      
+      /**
+       * Constructor
+       * @param stream the target input stream
+       * @param bufferSize the size for the internal buffer.
+       */
+      BufferedInputStream( InputStream* stream, const int bufferSize );
+      
+      /**
+       * Destructor.
+       */
+      virtual ~BufferedInputStream();
+      
+      /**
+       * Locks the object.
+       */
+      virtual void lock() throw(exceptions::ActiveMQException){
+         assert( stream != NULL );
+         stream->lock();
+      }
+   
+      /**
+       * Unlocks the object.
+       */
+      virtual void unlock() throw(exceptions::ActiveMQException){   
+         assert( stream != NULL );
+         stream->unlock();
+      }
+       
+      /**
+       * Waits on a signal from this object, which is generated
+       * by a call to Notify.  Must have this object locked before
+       * calling.
+       */
+      virtual void wait() throw(exceptions::ActiveMQException){
+         assert( stream != NULL );
+         stream->wait();
+      }
+    
+      /**
+       * Waits on a signal from this object, which is generated
+       * by a call to Notify.  Must have this object locked before
+       * calling.  This wait will timeout after the specified time
+       * interval.
+       * @param time in millisecsonds to wait, or WAIT_INIFINITE
+       * @throws ActiveMQException
+       */
+      virtual void wait(unsigned long millisecs) 
+         throw(exceptions::ActiveMQException) {
+         
+         assert( stream != NULL );
+         stream->wait(millisecs);
+      }
+
+      /**
+       * Signals a waiter on this object that it can now wake
+       * up and continue.  Must have this object locked before
+       * calling.
+       */
+      virtual void notify() throw(exceptions::ActiveMQException){
+         assert( stream != NULL );
+         stream->notify();
+      }
+        
+      /**
+       * Signals the waiters on this object that it can now wake
+       * up and continue.  Must have this object locked before
+       * calling.
+       */
+      virtual void notifyAll() throw(exceptions::ActiveMQException){
+         assert( stream != NULL );
+         stream->notifyAll();
+      }
+    
+      /**
+       * Indcates the number of bytes avaialable.
+       * @return the sum of the amount of data avalable
+       * in the buffer and the data available on the target
+       * input stream.
+       */
+      virtual int available() const{   
+         return (tail-head)+stream->available();
+      }
+            
+      /**
+       * Reads a single byte from the buffer.
+       * @return The next byte.
+       * @throws IOException thrown if an error occurs.
+       */
+      virtual unsigned char read() throw (IOException);
+      
+      /**
+       * Reads an array of bytes from the buffer.
+       * @param buffer (out) the target buffer.
+       * @param bufferSize the size of the output buffer.
+       * @return The number of bytes read.
+       * @throws IOException thrown if an error occurs.
+       */
+      virtual int read( unsigned char* buffer, const int bufferSize ) throw (IOException);
+      
+      /**
+       * Closes the target input stream.
+       */
+      virtual void close(void) throw(cms::CMSException);
+      
+   private:
+   
+      /**
+       * Initializes the internal structures.
+       */
+      void init( InputStream* stream, const int bufferSize );
+      
+      /**
+       * Populates the buffer with as much data as possible
+       * from the target input stream.
+       */
+      void bufferData(void) throw (IOException);
+
+   };
+   
+}}
+
+#endif /*ACTIVEMQ_IO_BUFFEREDINPUTSTREAM_H_*/

Added: incubator/activemq/trunk/activemq-cpp/src/main/activemq/io/BufferedOutputStream.cpp
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/io/BufferedOutputStream.cpp?rev=418749&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/io/BufferedOutputStream.cpp (added)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/io/BufferedOutputStream.cpp Mon Jul  3 04:51:36 2006
@@ -0,0 +1,121 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+#include "BufferedOutputStream.h"
+#include <algorithm>
+
+using namespace activemq::io;
+using namespace std;
+
+////////////////////////////////////////////////////////////////////////////////
+BufferedOutputStream::BufferedOutputStream( OutputStream* stream )
+{
+	// Default to 1k buffer.
+	init( stream, 1024 );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+BufferedOutputStream::BufferedOutputStream( OutputStream* stream, 
+	const int bufSize )
+{
+	init( stream, bufSize );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+BufferedOutputStream::~BufferedOutputStream()
+{
+    // Destroy the buffer.
+    if( buffer != NULL ){
+        delete [] buffer;
+        buffer = NULL;
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void BufferedOutputStream::init( OutputStream* stream, const int bufSize ){
+	
+	this->stream = stream;
+	this->bufferSize = bufSize;
+	
+	buffer = new unsigned char[bufSize];
+	head = tail = 0;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void BufferedOutputStream::close() throw(cms::CMSException){
+	
+	// Flush this stream.
+	flush();	
+	
+	// Close the delegate stream.
+	stream->close();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void BufferedOutputStream::emptyBuffer() throw (IOException){
+	
+	if( head != tail ){
+		stream->write( buffer+head, tail-head );
+	}
+	head = tail = 0;
+}
+		
+////////////////////////////////////////////////////////////////////////////////
+void BufferedOutputStream::flush() throw (IOException){
+	
+	// Empty the contents of the buffer to the output stream.
+	emptyBuffer();
+	
+	// Flush the output stream.
+	stream->flush();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void BufferedOutputStream::write( const unsigned char c ) throw (IOException){
+	
+	if( tail >= bufferSize ){
+		emptyBuffer();
+	}
+	
+	buffer[tail++] = c;	
+}
+
+////////////////////////////////////////////////////////////////////////////////		
+void BufferedOutputStream::write( const unsigned char* buffer, const int len ) 
+	throw (IOException)
+{		
+	// Iterate until all the data is written.
+	for( int pos=0; pos < len; ){
+		
+		if( tail >= bufferSize ){
+			emptyBuffer();
+		}
+	
+		// Get the number of bytes left to write.
+		int bytesToWrite = min( bufferSize-tail, len-pos );
+		
+		// Copy the data.
+		memcpy( this->buffer+tail, buffer+pos, bytesToWrite );
+		
+		// Increase the tail position.
+		tail += bytesToWrite;
+		
+		// Decrease the number of bytes to write.
+		pos += bytesToWrite;	
+	}	
+}
+

Added: incubator/activemq/trunk/activemq-cpp/src/main/activemq/io/BufferedOutputStream.h
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/io/BufferedOutputStream.h?rev=418749&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/io/BufferedOutputStream.h (added)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/io/BufferedOutputStream.h Mon Jul  3 04:51:36 2006
@@ -0,0 +1,181 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ACTIVEMQ_IO_BUFFEREDOUTPUTSTREAM_H_
+#define ACTIVEMQ_IO_BUFFEREDOUTPUTSTREAM_H_
+ 
+#include <activemq/io/OutputStream.h>
+#include <assert.h>
+
+namespace activemq{
+namespace io{
+   
+   /**
+    * Wrapper around another output stream that buffers
+    * output before writing to the target output stream.
+    */
+   class BufferedOutputStream : public OutputStream
+   {
+   private:
+   
+      /**
+       * The target output stream.
+       */
+      OutputStream* stream;
+      
+      /**
+       * The internal buffer.
+       */
+      unsigned char* buffer;
+      
+      /**
+       * The size of the internal buffer.
+       */
+      int bufferSize;
+      
+      /**
+       * The current head of the buffer.
+       */
+      int head;
+      
+      /**
+       * The current tail of the buffer.
+       */
+      int tail;
+      
+   public:
+   
+      /**
+       * Constructor.
+       * @param stream the target output stream.
+       */
+      BufferedOutputStream( OutputStream* stream );
+      
+      /**
+       * Constructor
+       * @param stream the target output stream.
+       * @param bufSize the size for the internal buffer.
+       */
+      BufferedOutputStream( OutputStream* stream, const int bufSize );
+      
+      /**
+       * Destructor
+       */
+      virtual ~BufferedOutputStream();
+      
+      /**
+       * Locks the object.
+       */
+      virtual void lock() throw(exceptions::ActiveMQException){
+         assert( stream != NULL );
+         stream->lock();
+      }
+   
+      /**
+       * Unlocks the object.
+       */
+      virtual void unlock() throw(exceptions::ActiveMQException){   
+         assert( stream != NULL );
+         stream->unlock();
+      }
+       
+      /**
+       * Waits on a signal from this object, which is generated
+       * by a call to Notify.  Must have this object locked before
+       * calling.
+       */
+      virtual void wait() throw(exceptions::ActiveMQException){
+         assert( stream != NULL );
+         stream->wait();
+      }
+    
+      /**
+       * Waits on a signal from this object, which is generated
+       * by a call to Notify.  Must have this object locked before
+       * calling.  This wait will timeout after the specified time
+       * interval.
+       * @param time in millisecsonds to wait, or WAIT_INIFINITE
+       * @throws ActiveMQException
+       */
+      virtual void wait(unsigned long millisecs) 
+         throw(exceptions::ActiveMQException) {
+         
+         assert( stream != NULL );
+         stream->wait(millisecs);
+      }
+
+      /**
+       * Signals a waiter on this object that it can now wake
+       * up and continue.  Must have this object locked before
+       * calling.
+       */
+      virtual void notify() throw(exceptions::ActiveMQException){
+         assert( stream != NULL );
+         stream->notify();
+      }
+        
+      /**
+       * Signals the waiters on this object that it can now wake
+       * up and continue.  Must have this object locked before
+       * calling.
+       */
+      virtual void notifyAll() throw(exceptions::ActiveMQException){
+         assert( stream != NULL );
+         stream->notifyAll();
+      }
+       
+       /**
+       * Writes a single byte to the output stream.
+       * @param c the byte.
+       * @throws IOException thrown if an error occurs.
+       */
+      virtual void write( const unsigned char c ) throw (IOException);
+      
+      /**
+       * Writes an array of bytes to the output stream.
+       * @param buffer The array of bytes to write.
+       * @param len The number of bytes from the buffer to be written.
+       * @throws IOException thrown if an error occurs.
+       */
+      virtual void write( const unsigned char* buffer, const int len ) throw (IOException);
+      
+      /**
+       * Invokes flush on the target output stream.
+       */
+      virtual void flush() throw (IOException);
+      
+      /**
+       * Invokes close on the target output stream.
+       */
+      void close() throw(cms::CMSException);
+      
+   private:
+   
+      /**
+       * Initializes the internal structures.
+       */
+      void init( OutputStream* stream, const int bufSize );
+      
+      /**
+       * Writes the contents of the buffer to the output stream.
+       */
+      void emptyBuffer() throw (IOException);
+   };
+
+}}
+
+#endif /*ACTIVEMQ_IO_BUFFEREDOUTPUTSTREAM_H_*/

Added: incubator/activemq/trunk/activemq-cpp/src/main/activemq/io/ByteArrayInputStream.cpp
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/io/ByteArrayInputStream.cpp?rev=418749&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/io/ByteArrayInputStream.cpp (added)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/io/ByteArrayInputStream.cpp Mon Jul  3 04:51:36 2006
@@ -0,0 +1,96 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+#include "ByteArrayInputStream.h"
+#include <algorithm>
+
+using namespace activemq::io;
+using namespace std;
+
+////////////////////////////////////////////////////////////////////////////////
+ByteArrayInputStream::ByteArrayInputStream()
+{
+   pos = buffer.end();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+ByteArrayInputStream::ByteArrayInputStream( const unsigned char* buffer,
+                                            int bufferSize )
+{
+   setByteArray( buffer, bufferSize );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+ByteArrayInputStream::~ByteArrayInputStream(void)
+{
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ByteArrayInputStream::setByteArray( const unsigned char* buffer,
+                                         int bufferSize )
+{
+   // Remove old data
+   this->buffer.clear();
+   
+   // Copy data to internal buffer.
+   for( int ix = 0; ix < bufferSize; ++ix )
+   {
+      this->buffer.push_back(buffer[ix]);
+   }
+   
+   // Begin at the Beginning.
+   pos = this->buffer.begin();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ByteArrayInputStream::close() throw(cms::CMSException){
+	
+	// Close the delegate stream.
+	buffer.clear();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+unsigned char ByteArrayInputStream::read() throw (IOException)
+{
+   if(pos != buffer.end())
+   {
+      return *(pos++);
+   }
+   
+   throw IOException( __FILE__, __LINE__, 
+      "ByteArrayInputStream::read: Out of Data");
+}
+
+////////////////////////////////////////////////////////////////////////////////
+int ByteArrayInputStream::read( unsigned char* buffer, 
+	                             const int bufferSize ) 
+                                   throw (IOException)
+{
+   int ix = 0;
+   
+   for( ; ix < bufferSize; ++ix, ++pos)
+   {
+      if(pos == this->buffer.end())
+      {        
+         break;
+      }
+      
+      buffer[ix] = *(pos);
+   }
+   
+   return ix;
+}



Mime
View raw message