activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jstrac...@apache.org
Subject svn commit: r382028 [1/4] - in /incubator/activemq/trunk/cms: ./ activemqcms/ activemqcms/src/ activemqcms/src/activemq/ activemqcms/src/activemq/concurrent/ activemqcms/src/activemq/io/ activemqcms/src/activemq/transport/ activemqcms/src/activemq/tran...
Date Wed, 01 Mar 2006 14:28:20 GMT
Author: jstrachan
Date: Wed Mar  1 06:27:46 2006
New Revision: 382028

URL: http://svn.apache.org/viewcvs?rev=382028&view=rev
Log:
Applied Nathan's C++ API for ActiveMQ

Added:
    incubator/activemq/trunk/cms/
    incubator/activemq/trunk/cms/activemqcms/
    incubator/activemq/trunk/cms/activemqcms/src/
    incubator/activemq/trunk/cms/activemqcms/src/activemq/
    incubator/activemq/trunk/cms/activemqcms/src/activemq/ActiveMQBytesMessage.h   (with props)
    incubator/activemq/trunk/cms/activemqcms/src/activemq/ActiveMQConnection.cpp   (with props)
    incubator/activemq/trunk/cms/activemqcms/src/activemq/ActiveMQConnection.h   (with props)
    incubator/activemq/trunk/cms/activemqcms/src/activemq/ActiveMQConnectionFactory.cpp   (with props)
    incubator/activemq/trunk/cms/activemqcms/src/activemq/ActiveMQConnectionFactory.h   (with props)
    incubator/activemq/trunk/cms/activemqcms/src/activemq/ActiveMQException.h   (with props)
    incubator/activemq/trunk/cms/activemqcms/src/activemq/ActiveMQPublisher.cpp   (with props)
    incubator/activemq/trunk/cms/activemqcms/src/activemq/ActiveMQPublisher.h   (with props)
    incubator/activemq/trunk/cms/activemqcms/src/activemq/ActiveMQSession.cpp   (with props)
    incubator/activemq/trunk/cms/activemqcms/src/activemq/ActiveMQSession.h   (with props)
    incubator/activemq/trunk/cms/activemqcms/src/activemq/ActiveMQSubscriber.cpp   (with props)
    incubator/activemq/trunk/cms/activemqcms/src/activemq/ActiveMQSubscriber.h   (with props)
    incubator/activemq/trunk/cms/activemqcms/src/activemq/ActiveMQTextMessage.h   (with props)
    incubator/activemq/trunk/cms/activemqcms/src/activemq/ActiveMQTopic.h   (with props)
    incubator/activemq/trunk/cms/activemqcms/src/activemq/PendingMessagePool.cpp   (with props)
    incubator/activemq/trunk/cms/activemqcms/src/activemq/PendingMessagePool.h   (with props)
    incubator/activemq/trunk/cms/activemqcms/src/activemq/PendingMessagePoolSession.h   (with props)
    incubator/activemq/trunk/cms/activemqcms/src/activemq/concurrent/
    incubator/activemq/trunk/cms/activemqcms/src/activemq/concurrent/Lock.h   (with props)
    incubator/activemq/trunk/cms/activemqcms/src/activemq/concurrent/Mutex.h   (with props)
    incubator/activemq/trunk/cms/activemqcms/src/activemq/concurrent/Synchronizable.h   (with props)
    incubator/activemq/trunk/cms/activemqcms/src/activemq/io/
    incubator/activemq/trunk/cms/activemqcms/src/activemq/io/BufferedInputStream.cpp   (with props)
    incubator/activemq/trunk/cms/activemqcms/src/activemq/io/BufferedInputStream.h   (with props)
    incubator/activemq/trunk/cms/activemqcms/src/activemq/io/BufferedOutputStream.cpp   (with props)
    incubator/activemq/trunk/cms/activemqcms/src/activemq/io/BufferedOutputStream.h   (with props)
    incubator/activemq/trunk/cms/activemqcms/src/activemq/io/IOException.h   (with props)
    incubator/activemq/trunk/cms/activemqcms/src/activemq/io/InputStream.h   (with props)
    incubator/activemq/trunk/cms/activemqcms/src/activemq/io/OutputStream.h   (with props)
    incubator/activemq/trunk/cms/activemqcms/src/activemq/io/Socket.cpp   (with props)
    incubator/activemq/trunk/cms/activemqcms/src/activemq/io/Socket.h   (with props)
    incubator/activemq/trunk/cms/activemqcms/src/activemq/io/SocketException.h   (with props)
    incubator/activemq/trunk/cms/activemqcms/src/activemq/io/SocketStream.cpp   (with props)
    incubator/activemq/trunk/cms/activemqcms/src/activemq/io/SocketStream.h   (with props)
    incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/
    incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/TopicListener.h   (with props)
    incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/Transport.h   (with props)
    incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/TransportFactory.h   (with props)
    incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/
    incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/AggregateProtocolAdapter.cpp   (with props)
    incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/AggregateProtocolAdapter.h   (with props)
    incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/BytesProtocolAdapter.h   (with props)
    incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/ConnectMessage.h   (with props)
    incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/ConnectProtocolAdapter.h   (with props)
    incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/ConnectedMessage.h   (with props)
    incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/ConnectedProtocolAdapter.h   (with props)
    incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/DestinationMessage.h   (with props)
    incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/DestinationPool.cpp   (with props)
    incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/DestinationPool.h   (with props)
    incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/DisconnectMessage.h   (with props)
    incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/DisconnectProtocolAdapter.h   (with props)
    incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/ErrorMessage.h   (with props)
    incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/ErrorProtocolAdapter.h   (with props)
    incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/ProtocolAdapter.cpp   (with props)
    incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/ProtocolAdapter.h   (with props)
    incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompBytesMessage.h   (with props)
    incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompFrame.cpp   (with props)
    incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompFrame.h   (with props)
    incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompIO.cpp   (with props)
    incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompIO.h   (with props)
    incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompInputStream.h   (with props)
    incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompMessage.h   (with props)
    incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompMessageListener.h   (with props)
    incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompOutputStream.h   (with props)
    incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompTextMessage.h   (with props)
    incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompTransport.cpp   (with props)
    incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompTransport.h   (with props)
    incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompTransportFactory.cpp   (with props)
    incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/StompTransportFactory.h   (with props)
    incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/SubscribeMessage.h   (with props)
    incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/SubscribeProtocolAdapter.h   (with props)
    incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/TextProtocolAdapter.h   (with props)
    incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/TransactionMessage.h   (with props)
    incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/UnsubscribeMessage.h   (with props)
    incubator/activemq/trunk/cms/activemqcms/src/activemq/transport/stomp/UnsubscribeProtocolAdapter.h   (with props)
    incubator/activemq/trunk/cms/cms/
    incubator/activemq/trunk/cms/cms/src/
    incubator/activemq/trunk/cms/cms/src/cms/
    incubator/activemq/trunk/cms/cms/src/cms/BytesMessage.h   (with props)
    incubator/activemq/trunk/cms/cms/src/cms/CMSException.h   (with props)
    incubator/activemq/trunk/cms/cms/src/cms/Closeable.h   (with props)
    incubator/activemq/trunk/cms/cms/src/cms/Connection.h   (with props)
    incubator/activemq/trunk/cms/cms/src/cms/ConnectionFactory.h   (with props)
    incubator/activemq/trunk/cms/cms/src/cms/Destination.h   (with props)
    incubator/activemq/trunk/cms/cms/src/cms/ExceptionListener.h   (with props)
    incubator/activemq/trunk/cms/cms/src/cms/Message.h   (with props)
    incubator/activemq/trunk/cms/cms/src/cms/MessageConsumer.h   (with props)
    incubator/activemq/trunk/cms/cms/src/cms/MessageListener.h   (with props)
    incubator/activemq/trunk/cms/cms/src/cms/MessageProducer.h   (with props)
    incubator/activemq/trunk/cms/cms/src/cms/Queue.h   (with props)
    incubator/activemq/trunk/cms/cms/src/cms/Service.h   (with props)
    incubator/activemq/trunk/cms/cms/src/cms/Session.h   (with props)
    incubator/activemq/trunk/cms/cms/src/cms/TextMessage.h   (with props)
    incubator/activemq/trunk/cms/cms/src/cms/Topic.h   (with props)
    incubator/activemq/trunk/cms/cms/src/cms/TopicConnection.h   (with props)
    incubator/activemq/trunk/cms/cms/src/cms/TopicConnectionFactory.h   (with props)
    incubator/activemq/trunk/cms/cms/src/cms/TopicPublisher.h   (with props)
    incubator/activemq/trunk/cms/cms/src/cms/TopicSession.h   (with props)
    incubator/activemq/trunk/cms/cms/src/cms/TopicSubscriber.h   (with props)
    incubator/activemq/trunk/cms/cms/src/cms/TransactionController.h   (with props)
    incubator/activemq/trunk/cms/cms/src/cms/XAResource.h   (with props)
    incubator/activemq/trunk/cms/cms/src/cms/XASession.h   (with props)
    incubator/activemq/trunk/cms/cms/src/cms/XATopicSession.h   (with props)
    incubator/activemq/trunk/cms/cms/src/cms/Xid.h   (with props)
    incubator/activemq/trunk/cms/docs/
    incubator/activemq/trunk/cms/docs/cms_overview.pdf   (with props)
    incubator/activemq/trunk/cms/test/
    incubator/activemq/trunk/cms/test/main.cpp   (with props)

Added: incubator/activemq/trunk/cms/activemqcms/src/activemq/ActiveMQBytesMessage.h
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/cms/activemqcms/src/activemq/ActiveMQBytesMessage.h?rev=382028&view=auto
==============================================================================
--- incubator/activemq/trunk/cms/activemqcms/src/activemq/ActiveMQBytesMessage.h (added)
+++ incubator/activemq/trunk/cms/activemqcms/src/activemq/ActiveMQBytesMessage.h Wed Mar  1 06:27:46 2006
@@ -0,0 +1,90 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ACTIVEMQ_ACTIVEMQBYTESMESSAGE_H_
+#define ACTIVEMQ_ACTIVEMQBYTESMESSAGE_H_
+
+#include <cms/BytesMessage.h>
+
+namespace activemq{
+		
+    /**
+     * Simple implementation of the bytes message interface.
+     * @author Nathan Mittler
+     */
+	class ActiveMQBytesMessage : public cms::BytesMessage
+	{
+	public:
+	
+		ActiveMQBytesMessage(){
+			data = NULL;
+			numBytes = 0;
+		}
+		virtual ~ActiveMQBytesMessage(){			
+			clearData();
+		};
+		
+		virtual void setData( const char* data, const int numBytes ) throw(cms::CMSException){
+			clearData();
+			
+			char* buf = new char[numBytes];
+			memcpy( buf, data, numBytes );
+			this->data = buf;
+			this->numBytes = numBytes;			
+		}
+		
+		virtual int getNumBytes() const{
+			return numBytes;
+		}
+		
+		virtual const char* getData() const{
+			return data;
+		}
+		
+		virtual void acknowledge() throw( cms::CMSException ){		
+		}
+        
+        /**
+         * Clones this message.
+         * @return a copy of this message.  The caller is responsible
+         * for freeing this memory.
+         */
+        virtual cms::Message* clone() const{
+            
+            ActiveMQBytesMessage* newMsg = new ActiveMQBytesMessage();
+            newMsg->setData( data, numBytes );
+            return newMsg;
+        }
+		
+	protected:
+	
+		void clearData(){
+			if( data != NULL ){
+				delete [] data;
+				data = NULL;
+			}
+		}
+		
+	private:
+
+		int numBytes;
+		char* data;
+	};
+
+}
+
+#endif /*ACTIVEMQ_ACTIVEMQBYTESMESSAGE_H_*/

Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/ActiveMQBytesMessage.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/ActiveMQBytesMessage.h
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Added: incubator/activemq/trunk/cms/activemqcms/src/activemq/ActiveMQConnection.cpp
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/cms/activemqcms/src/activemq/ActiveMQConnection.cpp?rev=382028&view=auto
==============================================================================
--- incubator/activemq/trunk/cms/activemqcms/src/activemq/ActiveMQConnection.cpp (added)
+++ incubator/activemq/trunk/cms/activemqcms/src/activemq/ActiveMQConnection.cpp Wed Mar  1 06:27:46 2006
@@ -0,0 +1,103 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "ActiveMQConnection.h"
+#include "ActiveMQException.h"
+#include "ActiveMQSession.h"
+#include "transport/Transport.h"
+#include <cms/ExceptionListener.h>
+
+using namespace activemq;
+using namespace activemq::transport;
+using namespace cms;
+using namespace std;
+
+////////////////////////////////////////////////////////////////////////////////
+ActiveMQConnection::ActiveMQConnection( Transport* transport )
+{
+	this->transport = transport;
+	this->transport->setExceptionListener( this );
+    this->exceptionListener = NULL;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+ActiveMQConnection::~ActiveMQConnection()
+{
+	close();
+    
+    if( transport != NULL ){
+        transport->setExceptionListener( NULL );
+        delete transport;
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConnection::start() throw( CMSException ){
+        
+    if( transport != NULL ){
+        
+        // Connect if necessary and start the flow of messages.
+        transport->start();    
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConnection::stop() throw( CMSException ){
+
+    if( transport != NULL ){
+        
+        // Stop the flow of messages.
+    	transport->stop();
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConnection::close() throw( CMSException )
+{
+	if( transport != NULL ){
+        
+        // Disconnect from the broker
+        transport->close();
+	}
+}
+
+////////////////////////////////////////////////////////////////////////////////	
+TopicSession* ActiveMQConnection::createTopicSession( 
+	const bool transacted, 
+	const Session::AcknowledgeMode acknowledgeMode ) 
+	throw( CMSException )
+{	
+	if( !transacted ){
+			
+		ActiveMQSession* session = new ActiveMQSession( this, acknowledgeMode );
+		return session;
+	}
+	
+	return NULL;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQConnection::onException( const CMSException* exception ){
+	
+	if( exceptionListener ){
+		exceptionListener->onException( exception );
+	}
+}
+
+
+
+

Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/ActiveMQConnection.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/ActiveMQConnection.cpp
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Added: incubator/activemq/trunk/cms/activemqcms/src/activemq/ActiveMQConnection.h
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/cms/activemqcms/src/activemq/ActiveMQConnection.h?rev=382028&view=auto
==============================================================================
--- incubator/activemq/trunk/cms/activemqcms/src/activemq/ActiveMQConnection.h (added)
+++ incubator/activemq/trunk/cms/activemqcms/src/activemq/ActiveMQConnection.h Wed Mar  1 06:27:46 2006
@@ -0,0 +1,157 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ACTIVEMQ_ACTIVEMQCONNECTION_H_
+#define ACTIVEMQ_ACTIVEMQCONNECTION_H_
+ 
+#include <cms/TopicConnection.h>
+#include <cms/TopicSession.h>
+#include <cms/ExceptionListener.h>
+#include <activemq/transport/Transport.h>
+#include <vector>
+
+namespace activemq{
+	
+	/**
+	 * Defines a connection for interfacing with an
+	 * ActiveMQ broker.
+	 * @author Nathan Mittler
+	 */
+	class ActiveMQConnection 
+	: 
+		public cms::ExceptionListener,
+		public cms::TopicConnection
+	{
+	public:
+	
+		/**
+		 * Constructor
+		 * @param transport The transport channel.
+		 */
+		ActiveMQConnection( transport::Transport* transport );
+			
+		/**
+		 * Destructor.
+		 */
+		virtual ~ActiveMQConnection();
+		
+		/**
+		 * Begins the dispatch of messages on this connection.
+		 */
+		virtual void start() throw( cms::CMSException );
+		
+		/**
+		 * Stops the dispatch of incoming messages to listeners.  Calling
+		 * start again will resume the flow of messages.
+		 */
+		virtual void stop() throw( cms::CMSException );
+		
+		/**
+		 * Closes this connection and all child sessions.
+		 */
+		virtual void close() throw( cms::CMSException );	
+		
+		/**
+		 * Delegates to createTopicSession().
+		 */
+		virtual cms::Session* createSession( const bool transacted,
+			const cms::Session::AcknowledgeMode acknowledgeMode = cms::Session::AUTO_ACKNOWLEDGE ) 
+			throw( cms::CMSException )
+		{
+			return createTopicSession( transacted, acknowledgeMode );
+		}
+			
+		/**
+		 * Creates a topic session for this connection.
+		 */
+		virtual cms::TopicSession* createTopicSession( const bool transacted, 
+			const cms::Session::AcknowledgeMode acknowledgeMode = cms::Session::AUTO_ACKNOWLEDGE) 
+			throw( cms::CMSException );		
+			
+		/**
+		 * Sets the listener to exceptions of this connection.
+		 */
+		virtual void setExceptionListener( cms::ExceptionListener* listener )
+			throw( cms::CMSException )
+		{
+			exceptionListener = listener;
+		}
+		
+		/**
+		 * Gets the listener of exceptions of this connection.
+		 */
+		virtual cms::ExceptionListener* getExceptionListener() const
+			throw( cms::CMSException )
+		{
+			return exceptionListener;
+		}		
+		
+		virtual const char* getUserName() const{
+			return userName.c_str();
+		}
+		
+		virtual const char* getPassword() const{
+			return password.c_str();
+		}
+		
+		/**
+		 * Gets the transport channel for this connection.
+		 */
+		virtual transport::Transport* getTransportChannel(){
+			return transport;
+		}
+		
+		/**
+		 * Gets the transport channel for this connection.
+		 */
+		virtual const transport::Transport* getTransportChannel() const{
+			return transport;
+		}
+		
+        /**
+         * Called by the transport layer when an exception occurs.
+         * @param exception The exception.
+         */
+		virtual void onException( const cms::CMSException* exception );
+		
+	private:	
+		
+		/**
+		 * The transport channel.
+		 */
+		transport::Transport* transport;
+		
+		/**
+		 * The user name for connecting to the broker.
+		 */
+		std::string userName;
+		
+		/**
+		 * The password for connecting to the broker.
+		 */
+		std::string password;
+		
+		/**
+		 * The listener of exceptions from this connection.
+		 */
+		cms::ExceptionListener* exceptionListener;
+	};
+	
+}
+
+#endif /*ACTIVEMQ_ACTIVEMQCONNECTION_H_ */
+

Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/ActiveMQConnection.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/ActiveMQConnection.h
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Added: incubator/activemq/trunk/cms/activemqcms/src/activemq/ActiveMQConnectionFactory.cpp
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/cms/activemqcms/src/activemq/ActiveMQConnectionFactory.cpp?rev=382028&view=auto
==============================================================================
--- incubator/activemq/trunk/cms/activemqcms/src/activemq/ActiveMQConnectionFactory.cpp (added)
+++ incubator/activemq/trunk/cms/activemqcms/src/activemq/ActiveMQConnectionFactory.cpp Wed Mar  1 06:27:46 2006
@@ -0,0 +1,98 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "ActiveMQConnectionFactory.h"
+#include "ActiveMQConnection.h"
+#include "transport/stomp/StompTransportFactory.h"
+#include <stdio.h>
+#include <sstream>
+
+using namespace activemq;
+using namespace activemq::transport;
+using namespace cms;
+using namespace std;
+
+////////////////////////////////////////////////////////////////////////////////
+ActiveMQConnectionFactory::ActiveMQConnectionFactory()
+{	
+    transportFactory = new activemq::transport::stomp::StompTransportFactory();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+ActiveMQConnectionFactory::ActiveMQConnectionFactory( 
+	const char* brokerUrl ) throw( CMSException )
+{
+    this->brokerUrl = brokerUrl;
+    transportFactory = new activemq::transport::stomp::StompTransportFactory();		
+}
+
+////////////////////////////////////////////////////////////////////////////////
+ActiveMQConnectionFactory::ActiveMQConnectionFactory( const char* userName, 
+	const char* password, 
+	const char* brokerUrl ) throw( CMSException )
+{
+	this->userName = userName;
+	this->password = password;
+
+    brokerUrl = brokerUrl;
+    
+    transportFactory = new activemq::transport::stomp::StompTransportFactory();
+}
+			
+////////////////////////////////////////////////////////////////////////////////
+ActiveMQConnectionFactory::~ActiveMQConnectionFactory()
+{
+    if( transportFactory != NULL ){
+        delete transportFactory;
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+Connection* ActiveMQConnectionFactory::createConnection() throw( CMSException ){
+	return createTopicConnection();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+Connection* ActiveMQConnectionFactory::createConnection( 
+	const char* userName, 
+	const char* password ) throw( CMSException )
+{		
+	return createTopicConnection( userName, password );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+TopicConnection* ActiveMQConnectionFactory::createTopicConnection()
+	throw( CMSException )
+{	
+    Transport* transport = transportFactory->createTransport( brokerUrl.c_str() );
+    
+    return new ActiveMQConnection( transport );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+TopicConnection* ActiveMQConnectionFactory::createTopicConnection( 
+	const char* userName, 
+	const char* password ) throw( CMSException )
+{        
+    Transport* transport = transportFactory->createTransport( brokerUrl.c_str(),
+        userName, 
+        password );
+    
+    return new ActiveMQConnection( transport );
+}
+
+

Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/ActiveMQConnectionFactory.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/ActiveMQConnectionFactory.cpp
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Added: incubator/activemq/trunk/cms/activemqcms/src/activemq/ActiveMQConnectionFactory.h
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/cms/activemqcms/src/activemq/ActiveMQConnectionFactory.h?rev=382028&view=auto
==============================================================================
--- incubator/activemq/trunk/cms/activemqcms/src/activemq/ActiveMQConnectionFactory.h (added)
+++ incubator/activemq/trunk/cms/activemqcms/src/activemq/ActiveMQConnectionFactory.h Wed Mar  1 06:27:46 2006
@@ -0,0 +1,110 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ACTIVEMQ_ACTIVEMQCONNECTIONFACTORY_H_
+#define ACTIVEMQ_ACTIVEMQCONNECTIONFACTORY_H_
+
+#include <cms/TopicConnectionFactory.h>
+#include <activemq/transport/TransportFactory.h>
+#include <string>
+
+namespace activemq{
+	
+	/**
+	 * This is the implementation of the connection factory interfaces for
+	 * use with the ActiveMQ stomp implementation.
+	 * @author Nathan Mittler
+	 */
+	class ActiveMQConnectionFactory : public cms::TopicConnectionFactory
+	{		
+	public:
+	
+		/**
+		 * Default constructor.
+		 */
+		ActiveMQConnectionFactory();
+		
+		/**
+		 * Constructor for connecting to the broker with a default login.
+		 * @param brokerUrl The URL of the stomp broker.
+		 */
+		ActiveMQConnectionFactory( const char* brokerUrl ) throw( cms::CMSException );
+		
+		/**
+		 * Constructor with full argument list.
+		 * @param userName The user name for connecting to the stomp broker.
+		 * @param password The password for connecting to the stomp broker.
+		 * @param brokerUrl The url of the stomp broker.
+		 */
+		ActiveMQConnectionFactory( const char* userName, const char* password, 
+			const char* brokerUrl ) throw( cms::CMSException );
+			
+		/**
+		 * Destructor.
+		 */
+		virtual ~ActiveMQConnectionFactory();
+		
+		/**
+		 * Creates a topic connection with the default user identity.
+		 */
+		virtual cms::Connection* createConnection() throw( cms::CMSException );
+		
+		/**
+		 * Creates a topic connection with the specified user identity.
+		 */
+		virtual cms::Connection* createConnection( 
+			const char* userName, 
+			const char* password ) throw( cms::CMSException );
+			
+		/**
+		 * Creates a topic connection with the default user identity.
+		 */
+		virtual cms::TopicConnection* createTopicConnection() throw( cms::CMSException );
+		
+		/**
+		 * Creates a topic connection with the specified user identity.
+		 */
+		virtual cms::TopicConnection* createTopicConnection( 
+			const char* userName, 
+			const char* password ) throw( cms::CMSException );
+	
+	private:
+	
+        /**
+         * The url of the broker.
+         */
+        std::string brokerUrl;
+		
+		/**
+		 * The user name for connecting to the broker.
+		 */
+		std::string userName;
+		
+		/**
+		 * The password for connecting to the broker.
+		 */
+		std::string password;
+        
+        /**
+         * Factory for transport objects.
+         */
+        transport::TransportFactory* transportFactory;
+	};
+	
+}
+
+#endif /*ACTIVEMQ_ACTIVEMQCONNECTIONFACTORY_H_*/

Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/ActiveMQConnectionFactory.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/ActiveMQConnectionFactory.h
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Added: incubator/activemq/trunk/cms/activemqcms/src/activemq/ActiveMQException.h
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/cms/activemqcms/src/activemq/ActiveMQException.h?rev=382028&view=auto
==============================================================================
--- incubator/activemq/trunk/cms/activemqcms/src/activemq/ActiveMQException.h (added)
+++ incubator/activemq/trunk/cms/activemqcms/src/activemq/ActiveMQException.h Wed Mar  1 06:27:46 2006
@@ -0,0 +1,58 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ACTIVEMQ_STOMPEXCEPTION_H_
+#define ACTIVEMQ_STOMPEXCEPTION_H_
+
+#include <cms/CMSException.h>
+#include <string>
+
+namespace activemq{
+	
+	class ActiveMQException : public cms::CMSException{
+		
+	public:
+	
+		ActiveMQException( const ActiveMQException& ex ){
+			(*this)=ex;
+		}
+		
+		ActiveMQException( const char* msg ){
+			this->msg = msg;
+		}
+		
+		ActiveMQException( const std::string msg ){
+			this->msg = msg;
+		}
+		
+		virtual ~ActiveMQException(){}
+		
+		/**
+		 * @return The error message.
+		 */
+		virtual const char* getMessage() const{
+			
+			return msg.c_str();
+		}
+		
+	private:
+		
+		std::string msg;
+	};
+}
+
+#endif /*ACTIVEMQ_STOMPEXCEPTION_H_*/

Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/ActiveMQException.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/ActiveMQException.h
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Added: incubator/activemq/trunk/cms/activemqcms/src/activemq/ActiveMQPublisher.cpp
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/cms/activemqcms/src/activemq/ActiveMQPublisher.cpp?rev=382028&view=auto
==============================================================================
--- incubator/activemq/trunk/cms/activemqcms/src/activemq/ActiveMQPublisher.cpp (added)
+++ incubator/activemq/trunk/cms/activemqcms/src/activemq/ActiveMQPublisher.cpp Wed Mar  1 06:27:46 2006
@@ -0,0 +1,70 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "ActiveMQPublisher.h"
+#include "ActiveMQTopic.h"
+#include "ActiveMQSession.h"
+#include "ActiveMQConnection.h"
+#include "transport/Transport.h"
+
+using namespace activemq;
+using namespace activemq::transport;
+using namespace cms;
+
+////////////////////////////////////////////////////////////////////////////////
+ActiveMQPublisher::ActiveMQPublisher( const Topic* topic, 
+	ActiveMQSession* session )
+{
+	this->topic = topic;
+	this->session = session;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+ActiveMQPublisher::~ActiveMQPublisher()
+{
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQPublisher::publish( Message* message ) 
+	throw( CMSException ){
+	
+	publish( topic, message );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQPublisher::publish( const Topic* topic, 
+	Message* message ) throw( CMSException )
+{    
+    
+    if( topic == NULL ){        
+        throw ActiveMQException( "ActiveMQPublisher::publish() - invalid topic" );
+    }
+    
+    if( message == NULL ){
+        throw ActiveMQException( "ActiveMQPublisher::publish() - invalid message" );
+    }
+    
+    // Get the transport object.
+    Transport* transport = session->getConnection()->getTransportChannel();
+    if( transport == NULL ){
+        throw ActiveMQException( "ActiveMQPublisher::publish() - invalid transport layer" );
+    }
+    
+    // Send the message.
+    transport->sendMessage( topic, message );
+}
+

Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/ActiveMQPublisher.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/ActiveMQPublisher.cpp
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Added: incubator/activemq/trunk/cms/activemqcms/src/activemq/ActiveMQPublisher.h
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/cms/activemqcms/src/activemq/ActiveMQPublisher.h?rev=382028&view=auto
==============================================================================
--- incubator/activemq/trunk/cms/activemqcms/src/activemq/ActiveMQPublisher.h (added)
+++ incubator/activemq/trunk/cms/activemqcms/src/activemq/ActiveMQPublisher.h Wed Mar  1 06:27:46 2006
@@ -0,0 +1,70 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ACTIVEMQ_ACTIVEMQPUBLISHER_H_
+#define ACTIVEMQ_ACTIVEMQPUBLISHER_H_
+
+#include <cms/TopicPublisher.h>
+#include <cms/Topic.h>
+
+namespace activemq{
+	
+	// Forward declarations.
+	class ActiveMQSession;
+	
+    /**
+     * Basic implementation of the topic publisher interface.
+     * @author Nathan Mittler
+     */
+	class ActiveMQPublisher : public cms::TopicPublisher
+	{
+	public:
+		ActiveMQPublisher( const cms::Topic* topic, 
+			ActiveMQSession* session );
+		virtual ~ActiveMQPublisher();
+		
+		/**
+		 * Gets the topic associated with this TopicPublisher.
+		 */
+		virtual const cms::Topic* getTopic() const throw( cms::CMSException ){
+			return topic;
+		}
+		
+		/**
+		 * Publishes a message to the topic.
+		 * @param message the message to publish
+		 */
+		virtual void publish( cms::Message* message ) 
+			throw( cms::CMSException );
+		
+		/**
+		 * Publishes a message to a topic for an unidentified message producer.
+		 * @param topic The topic to publish this message to.
+		 * @param message The message to publish
+		 */
+		virtual void publish( const cms::Topic* topic, 
+			cms::Message* message ) throw( cms::CMSException );
+						
+	private:
+	
+		const cms::Topic* topic;
+		ActiveMQSession* session;
+	};
+	
+}
+
+#endif /*ACTIVEMQ_ACTIVEMQPUBLISHER_H_*/

Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/ActiveMQPublisher.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/ActiveMQPublisher.h
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Added: incubator/activemq/trunk/cms/activemqcms/src/activemq/ActiveMQSession.cpp
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/cms/activemqcms/src/activemq/ActiveMQSession.cpp?rev=382028&view=auto
==============================================================================
--- incubator/activemq/trunk/cms/activemqcms/src/activemq/ActiveMQSession.cpp (added)
+++ incubator/activemq/trunk/cms/activemqcms/src/activemq/ActiveMQSession.cpp Wed Mar  1 06:27:46 2006
@@ -0,0 +1,95 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "ActiveMQSession.h"
+#include "ActiveMQConnection.h"
+#include "ActiveMQTopic.h"
+#include "ActiveMQTextMessage.h"
+#include "ActiveMQBytesMessage.h"
+#include "ActiveMQPublisher.h"
+#include "ActiveMQSubscriber.h"
+
+using namespace activemq;
+using namespace cms;
+using namespace std;
+
+////////////////////////////////////////////////////////////////////////////////
+ActiveMQSession::ActiveMQSession( ActiveMQConnection* connection,
+	const Session::AcknowledgeMode acknowledgeMode )
+{
+	this->connection = connection;
+	this->transacted = transacted;
+	this->acknowledgeMode = acknowledgeMode;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+ActiveMQSession::~ActiveMQSession()
+{
+	close();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQSession::close() throw( CMSException ){
+}
+
+////////////////////////////////////////////////////////////////////////////////
+TextMessage* ActiveMQSession::createTextMessage() throw( CMSException ){
+	ActiveMQTextMessage* textMsg = new ActiveMQTextMessage();
+	return textMsg;
+}
+
+////////////////////////////////////////////////////////////////////////////////	
+TextMessage* ActiveMQSession::createTextMessage( const char* msg ) throw( CMSException )
+{
+	ActiveMQTextMessage* textMsg = new ActiveMQTextMessage();
+	textMsg->setText( msg );
+	return textMsg;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+BytesMessage* ActiveMQSession::createBytesMessage() throw( CMSException ){
+	ActiveMQBytesMessage* bMsg = new ActiveMQBytesMessage();
+	return bMsg;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+TopicPublisher* ActiveMQSession::createPublisher( const Topic* topic ) 
+	throw( CMSException )
+{
+	return new ActiveMQPublisher( topic, this );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+TopicSubscriber* ActiveMQSession::createSubscriber( const Topic* topic ) 
+	throw( CMSException )
+{
+	ActiveMQSubscriber* subscriber = new ActiveMQSubscriber( topic, this );	
+	return subscriber;	
+}
+
+////////////////////////////////////////////////////////////////////////////////
+Topic* ActiveMQSession::createTopic( const char* topicName ) 
+	throw( CMSException )
+{
+	return new ActiveMQTopic( topicName );
+}
+
+
+
+
+			
+			

Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/ActiveMQSession.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/ActiveMQSession.cpp
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Added: incubator/activemq/trunk/cms/activemqcms/src/activemq/ActiveMQSession.h
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/cms/activemqcms/src/activemq/ActiveMQSession.h?rev=382028&view=auto
==============================================================================
--- incubator/activemq/trunk/cms/activemqcms/src/activemq/ActiveMQSession.h (added)
+++ incubator/activemq/trunk/cms/activemqcms/src/activemq/ActiveMQSession.h Wed Mar  1 06:27:46 2006
@@ -0,0 +1,127 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ACTIVEMQ_ACTIVEMQSESSION_H_
+#define ACTIVEMQ_ACTIVEMQSESSION_H_
+
+#include <cms/TopicSession.h>
+#include <activemq/ActiveMQException.h>
+
+namespace activemq{
+	
+	// Forward declarations.
+	class ActiveMQConnection;
+	
+	class ActiveMQSession : public cms::TopicSession
+	{
+	public:
+	
+		ActiveMQSession( 
+			ActiveMQConnection* connection,
+			const cms::Session::AcknowledgeMode acknowledgeMode );
+		virtual ~ActiveMQSession();		
+		
+		/**
+		 * Closes this session.  The connection will still be active.
+		 */
+		virtual void close() throw( cms::CMSException );
+		
+		/**
+		 * Commits the current transaction.
+		 * Unsupported by this class - throws exception.
+		 */
+		virtual void commit() throw( cms::CMSException ){
+			throw ActiveMQException( "operation (commit): unsupported by ActiveMQSession" );
+		}
+		
+		/**
+		 * Cancels the current transaction and rolls the system state
+		 * back to before the transaction occurred.
+		 * Unsupported by this class - throws exception.
+		 */
+		virtual void rollback() throw( cms::CMSException ){
+			throw ActiveMQException( "operation (rollback): unsupported by ActiveMQSession" );
+		}
+		
+		/**
+		 * Indicates whether the session is in transacted mode.
+		 * @return true if the session is in transacted mode.
+		 */
+		virtual bool getTransacted() const throw( cms::CMSException ){
+			return false;
+		}
+		
+		/**
+		 * Creates a TextMessage object.
+		 * @return a new text message object.
+		 */
+		virtual cms::TextMessage* createTextMessage() throw( cms::CMSException ); 
+		
+		/**
+		 * Creates an initialized TextMessage object.
+		 * @param msg The buffer to be set in the message.
+		 * @return A new text message object.
+		 */
+		virtual cms::TextMessage* createTextMessage( const char* msg ) 
+			throw( cms::CMSException );
+			
+		/**
+		 * Creates a BytesMessage object.
+		 * @return A new byte message object.
+		 */
+		virtual cms::BytesMessage* createBytesMessage() throw( cms::CMSException );
+		
+		/**
+		 * Creates a publisher for the specified topic.
+		 * @param topic The Topic to publish to, or null if this is an 
+		 * unidentified producer
+		 */
+		virtual cms::TopicPublisher* createPublisher( const cms::Topic* topic ) 
+			throw( cms::CMSException );
+		
+		/**
+		 * Creates a subscriber to the specified topic.
+		 * @param topic The Topic to publish to, or null if this is an 
+		 * unidentified producer
+		 */
+		virtual cms::TopicSubscriber* createSubscriber( const cms::Topic* topic ) 
+			throw( cms::CMSException );
+										
+		/**
+		 * Creates a topic identity given a Topic name.
+		 */				  
+		virtual cms::Topic* createTopic( const char* topicName ) 
+			throw( cms::CMSException );
+			
+		virtual ActiveMQConnection* getConnection(){
+			return connection;
+		}
+		
+		virtual const ActiveMQConnection* getConnection() const{
+			return connection;		
+		}
+		
+	private:
+	
+		ActiveMQConnection* connection;
+		bool transacted;
+		cms::Session::AcknowledgeMode acknowledgeMode;
+	};
+	
+}
+
+#endif /*ACTIVEMQ_ACTIVEMQSESSION_H_*/

Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/ActiveMQSession.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/ActiveMQSession.h
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Added: incubator/activemq/trunk/cms/activemqcms/src/activemq/ActiveMQSubscriber.cpp
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/cms/activemqcms/src/activemq/ActiveMQSubscriber.cpp?rev=382028&view=auto
==============================================================================
--- incubator/activemq/trunk/cms/activemqcms/src/activemq/ActiveMQSubscriber.cpp (added)
+++ incubator/activemq/trunk/cms/activemqcms/src/activemq/ActiveMQSubscriber.cpp Wed Mar  1 06:27:46 2006
@@ -0,0 +1,165 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "ActiveMQSubscriber.h"
+#include "ActiveMQSession.h"
+#include "PendingMessagePoolSession.h"
+#include "ActiveMQConnection.h"
+#include "transport/Transport.h"
+#include <sstream>
+#include <cms/MessageListener.h>
+#include <time.h>
+#include <sys/time.h>
+
+using namespace activemq;
+using namespace activemq::transport;
+using namespace cms;
+using namespace std;
+
+////////////////////////////////////////////////////////////////////////////////
+ActiveMQSubscriber::ActiveMQSubscriber( const cms::Topic* topic,
+	ActiveMQSession* session )
+{
+	this->topic = topic;
+	this->session = session;
+        
+    // Add this object as a listener to the topic.
+    Transport* transport = session->getConnection()->getTransportChannel();
+    if( transport != NULL ){
+        transport->addMessageListener( topic, this );
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+ActiveMQSubscriber::~ActiveMQSubscriber()
+{
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQSubscriber::onMessage( const cms::Message* message ){
+	
+	// Notify the message listener of the new message.
+	notify( message );
+	
+	// Add the message to the pending message pool for any synchronous consumers
+	// (e.g. calls to receive(),etc).
+	pendingMessagePool.onMessage( message );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQSubscriber::close() throw( cms::CMSException ){
+        
+    // Remove this object as a listener to the topic.
+    Transport* transport = session->getConnection()->getTransportChannel();
+    if( transport != NULL ){
+        transport->removeMessageListener( topic, this );
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void ActiveMQSubscriber::notify( const Message* msg ){
+	
+	if( messageListener != NULL ){
+		messageListener->onMessage( msg );
+	}
+}
+
+////////////////////////////////////////////////////////////////////////////////
+Message* ActiveMQSubscriber::receive() throw( CMSException ){
+	
+	// Create a session for the pending message pool.
+	PendingMessagePoolSession session( &pendingMessagePool );
+	
+	// Create the sleep time interval.  We'll sleep for a short
+	// time between each attempted read.
+	timespec sleepInterval;
+	sleepInterval.tv_sec = readSleepMicroseconds / 1000000;
+	sleepInterval.tv_nsec = (readSleepMicroseconds - (sleepInterval.tv_sec * 1000000 ))*1000;
+		
+	// Loop until we successfully receive the next message.
+	while( true ){
+	
+		// Read the next message for this session.
+		cms::Message* msg = session.popNextPendingMessage();
+		if( msg != NULL ){
+			
+			// We successfully received the next message - return it.
+			return msg;
+		}
+		
+		// No message exists, sleep a short while then try again.
+		timespec remaining;
+		nanosleep(&sleepInterval, &remaining);	
+	}
+}
+
+////////////////////////////////////////////////////////////////////////////////
+Message* ActiveMQSubscriber::receive( long timeout ) throw( CMSException ){
+	
+	// Create a session for the pending message pool.
+	PendingMessagePoolSession session( &pendingMessagePool );
+	
+	// Create the sleep time interval.  We'll sleep for a short
+	// time between each attempted read.
+	timespec sleepInterval;
+	sleepInterval.tv_sec = readSleepMicroseconds / 1000000;
+	sleepInterval.tv_nsec = (readSleepMicroseconds - (sleepInterval.tv_sec * 1000000 ))*1000;
+	
+	// Get the current time.
+	timeval temp;
+	gettimeofday( &temp, NULL );
+	
+	// Get the start time in microseconds.	
+	long currTime = (temp.tv_sec * 1000000) + temp.tv_usec;
+	
+	// Determine the end time of this receive.  We will not wait
+	// any longer than this time.
+	long endTime = currTime + (timeout * 1000);
+		
+	// Loop while we haven't exceeded our max wait time.
+	while( currTime < endTime ){
+	
+		// Read the next message for this session.
+		cms::Message* msg = session.popNextPendingMessage();
+		if( msg != NULL ){
+			
+			// We successfully received the next message - return it.
+			return msg;
+		}
+		
+		// No message exists, sleep a short while then try again.
+		timespec remaining;
+		nanosleep(&sleepInterval, &remaining);
+			
+		// Update the running time total.
+		currTime += readSleepMicroseconds;			
+	}
+	
+	return NULL;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+Message* ActiveMQSubscriber::receiveNoWait() throw( CMSException ){
+	
+	// Create a session for the pending message pool.
+	PendingMessagePoolSession session( &pendingMessagePool );
+	
+	// Read the next message for this session and return it.
+	cms::Message* msg = session.popNextPendingMessage();	
+	return msg;
+}
+

Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/ActiveMQSubscriber.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/ActiveMQSubscriber.cpp
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Added: incubator/activemq/trunk/cms/activemqcms/src/activemq/ActiveMQSubscriber.h
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/cms/activemqcms/src/activemq/ActiveMQSubscriber.h?rev=382028&view=auto
==============================================================================
--- incubator/activemq/trunk/cms/activemqcms/src/activemq/ActiveMQSubscriber.h (added)
+++ incubator/activemq/trunk/cms/activemqcms/src/activemq/ActiveMQSubscriber.h Wed Mar  1 06:27:46 2006
@@ -0,0 +1,104 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ACTIVEMQ_ACTIVEMQSUBSCRIBER_H_
+#define ACTIVEMQ_ACTIVEMQSUBSCRIBER_H_
+
+#include <cms/TopicSubscriber.h>
+#include <cms/Topic.h>
+#include <activemq/PendingMessagePool.h>
+#include <cms/MessageListener.h>
+#include <list>
+
+namespace activemq{
+	
+	// Forward decalarations.
+	class ActiveMQSession;
+	
+	class ActiveMQSubscriber
+	: 
+		public cms::TopicSubscriber,
+		public cms::MessageListener
+	{
+	public:
+	
+		ActiveMQSubscriber( const cms::Topic* topic, ActiveMQSession* session );
+		virtual ~ActiveMQSubscriber();
+		
+		/**
+		 * Gets the message consumer's MessageListener.
+		 */
+		virtual cms::MessageListener* getMessageListener() const 
+			throw( cms::CMSException ){
+			return messageListener;
+		}
+		
+		/**
+		 * Sets the message consumer's MessageListener.
+		 */
+		virtual void setMessageListener( cms::MessageListener* listener ) 
+			throw( cms::CMSException ){
+			messageListener = listener;
+		}
+		
+		/**
+		 * Receives the next message produced for this message consumer.
+		 */
+		virtual cms::Message* receive() throw( cms::CMSException );
+		
+		/**
+		 * Receives the next message that arrives within the specified 
+		 * timeout interval.
+		 * @param timeout The timeout value (in milliseconds)
+		 */
+		virtual cms::Message* receive( long timeout ) throw( cms::CMSException );
+		
+		/**
+		 * Receives the next message if one is immediately available.
+		 */
+		virtual cms::Message* receiveNoWait() throw( cms::CMSException );
+		
+		/**
+		 * Gets the Topic associated with this subscriber.
+		 */
+		virtual const cms::Topic* getTopic() const throw( cms::CMSException ){
+			return topic;
+		}
+		
+		/**
+		 * Closes this object and deallocates the appropriate resources.
+		 */
+		virtual void close() throw( cms::CMSException );
+		
+		virtual void onMessage( const cms::Message* msg );
+		
+	private:
+	
+		void notify( const cms::Message* msg );
+		
+	private:
+	
+		cms::MessageListener* messageListener;
+		const cms::Topic* topic;
+		ActiveMQSession* session;
+		PendingMessagePool pendingMessagePool;
+		static const long readSleepMicroseconds = 10000;
+	};
+	
+}
+
+#endif /*ACTIVEMQ_ACTIVEMQSUBSCRIBER_H_*/

Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/ActiveMQSubscriber.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/ActiveMQSubscriber.h
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Added: incubator/activemq/trunk/cms/activemqcms/src/activemq/ActiveMQTextMessage.h
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/cms/activemqcms/src/activemq/ActiveMQTextMessage.h?rev=382028&view=auto
==============================================================================
--- incubator/activemq/trunk/cms/activemqcms/src/activemq/ActiveMQTextMessage.h (added)
+++ incubator/activemq/trunk/cms/activemqcms/src/activemq/ActiveMQTextMessage.h Wed Mar  1 06:27:46 2006
@@ -0,0 +1,64 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ACTIVEMQ_ACTIVEMQTEXTMESSAGE_H_
+#define ACTIVEMQ_ACTIVEMQTEXTMESSAGE_H_
+
+#include <cms/TextMessage.h>
+
+namespace activemq{
+		
+	class ActiveMQTextMessage : public cms::TextMessage
+	{
+	public:
+	
+		ActiveMQTextMessage(){
+		}
+		virtual ~ActiveMQTextMessage(){
+		}
+		
+		virtual const char* getText() const throw( cms::CMSException ){
+			return message.c_str();
+		}
+		
+		virtual void setText( const char* msg ) throw( cms::CMSException ){
+			message = msg;
+		}
+		
+		virtual void acknowledge() throw( cms::CMSException ){			
+		}
+		
+        /**
+         * Clones this message.
+         * @return a copy of this message.  The caller is responsible
+         * for freeing this memory.
+         */
+        virtual cms::Message* clone() const{
+            
+            ActiveMQTextMessage* newMsg = new ActiveMQTextMessage();
+            newMsg->setText( message.c_str() );
+            return newMsg;
+        }
+        
+	private:
+	
+		std::string message;
+	};
+
+}
+
+#endif /*ACTIVEMQ_ACTIVEMQTEXTMESSAGE_H_*/

Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/ActiveMQTextMessage.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/ActiveMQTextMessage.h
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Added: incubator/activemq/trunk/cms/activemqcms/src/activemq/ActiveMQTopic.h
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/cms/activemqcms/src/activemq/ActiveMQTopic.h?rev=382028&view=auto
==============================================================================
--- incubator/activemq/trunk/cms/activemqcms/src/activemq/ActiveMQTopic.h (added)
+++ incubator/activemq/trunk/cms/activemqcms/src/activemq/ActiveMQTopic.h Wed Mar  1 06:27:46 2006
@@ -0,0 +1,53 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ACTIVEMQ_ACTIVEMQTOPIC_H_
+#define ACTIVEMQ_ACTIVEMQTOPIC_H_
+
+#include <cms/Topic.h>
+#include <string>
+
+namespace activemq{
+	
+	class ActiveMQTopic : public cms::Topic
+	{
+	public:
+	
+		ActiveMQTopic( const char* name ){			
+			this->name = name;
+		}
+		ActiveMQTopic( const std::string& name ){			
+			this->name = name;
+		}
+		
+		virtual ~ActiveMQTopic(){}
+		
+		/**
+		 * Gets the name of this topic.
+		 * @return The topic name.
+		 */
+		virtual const char* getTopicName() const throw( cms::CMSException ){
+			return name.c_str();
+		}
+		
+	private:
+	
+		std::string name;
+	};
+}
+
+#endif /*ACTIVEMQ_ACTIVEMQTOPIC_H_*/

Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/ActiveMQTopic.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/ActiveMQTopic.h
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Added: incubator/activemq/trunk/cms/activemqcms/src/activemq/PendingMessagePool.cpp
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/cms/activemqcms/src/activemq/PendingMessagePool.cpp?rev=382028&view=auto
==============================================================================
--- incubator/activemq/trunk/cms/activemqcms/src/activemq/PendingMessagePool.cpp (added)
+++ incubator/activemq/trunk/cms/activemqcms/src/activemq/PendingMessagePool.cpp Wed Mar  1 06:27:46 2006
@@ -0,0 +1,147 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "PendingMessagePool.h"
+#include "concurrent/Lock.h"
+#include <cms/Message.h>
+
+using namespace activemq;
+using namespace activemq::concurrent;
+using namespace std;
+
+////////////////////////////////////////////////////////////////////////////////
+PendingMessagePool::PendingMessagePool()
+{
+}
+
+////////////////////////////////////////////////////////////////////////////////
+PendingMessagePool::~PendingMessagePool()
+{
+	// Remove all sessions.
+	for( unsigned int ix=0; ix<sessionQueues.size(); ++ix ){
+		removeSession(ix);
+	}
+}
+
+////////////////////////////////////////////////////////////////////////////////
+unsigned int PendingMessagePool::addSession(){
+	
+	// Lock this class.
+	Lock lock( &mutex );
+	
+	// First, check any of the existing queues to see if they're no longer
+	// active (the boolean first in the pair indicates whether or not the
+	// given queue is actively being used).  If an inactive queue is found,
+	// use it.
+	for( unsigned int ix=0; ix<sessionQueues.size(); ++ix ){
+		if( sessionQueues[ix].first == false ){
+			sessionQueues[ix].first = true;
+			sessionQueues[ix].second.clear();
+			return ix;
+		}
+	}
+	
+	// No existing inactive queue was found - add one for this session.
+	sessionQueues.push_back( pair<bool, list<cms::Message*> >() );
+	
+	// Get the position of the inserted element.
+	unsigned int pos = sessionQueues.size()-1;
+	
+	// Mark this session as active.
+	sessionQueues[pos].first = true;
+	
+	// Return the index of the last added element as the session id.
+	return pos;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void PendingMessagePool::removeSession( const unsigned int sessionId ){
+	
+	// Lock this class.
+	Lock lock( &mutex );
+	
+	// If the session is not in the pool, just return.
+	if( sessionId >= sessionQueues.size() ){
+		return;
+	}
+	
+	// Mark this session as inactive.
+	sessionQueues[sessionId].first = false;
+	
+	list<cms::Message*>& msgs = sessionQueues[sessionId].second;
+	list<cms::Message*>::iterator iter = msgs.begin();
+	for( ; iter != msgs.end(); ++iter ){
+        cms::Message* msg = *iter;
+        delete msg;
+	}
+	msgs.clear();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+cms::Message* PendingMessagePool::popNextPendingMessage(
+	const unsigned int sessionId ){
+	
+	// Lock this class.
+	Lock lock( &mutex );
+	
+	// If this is an invalid or inactive session, just return NULL.
+	if( sessionId > sessionQueues.size() || sessionQueues[sessionId].first == false ){
+		return NULL;
+	}
+	
+	// Get the message q for this session.
+	list<cms::Message*>& msgs = sessionQueues[sessionId].second;
+	
+	// Get the number of messages in the queue.
+	unsigned int numMessages = msgs.size();
+	
+	// If there are no messages, just return.
+	if( numMessages == 0 ){
+		return NULL;
+	}
+	
+	// Remove the first message from the queue.
+	cms::Message* nextMsg = msgs.front();
+	msgs.pop_front();
+	
+	// Return the next message to the caller.  It is now the
+	// responsibility of the caller to free this memory.
+	return nextMsg;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void PendingMessagePool::onMessage( const cms::Message* msg ){
+	
+	// Lock this class.
+	Lock lock( &mutex );
+	
+	// Add a copy of this message to each active session queue.
+	for( unsigned int ix=0; ix<sessionQueues.size(); ++ix ){
+		
+		if( sessionQueues[ix].first == true ){
+			
+			list<cms::Message*>& msgs = sessionQueues[ix].second;
+			
+            // Cast away const-ness
+            cms::Message* tempMsg = const_cast<cms::Message*>(msg);
+            
+			// Clone the message.
+			msgs.push_back( tempMsg->clone() );
+		}
+	}
+}
+

Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/PendingMessagePool.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/PendingMessagePool.cpp
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Added: incubator/activemq/trunk/cms/activemqcms/src/activemq/PendingMessagePool.h
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/cms/activemqcms/src/activemq/PendingMessagePool.h?rev=382028&view=auto
==============================================================================
--- incubator/activemq/trunk/cms/activemqcms/src/activemq/PendingMessagePool.h (added)
+++ incubator/activemq/trunk/cms/activemqcms/src/activemq/PendingMessagePool.h Wed Mar  1 06:27:46 2006
@@ -0,0 +1,96 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ACTIVEMQ_PENDINGMESSAGEPOOL_H_
+#define ACTIVEMQ_PENDINGMESSAGEPOOL_H_
+
+#include <cms/MessageListener.h>
+#include <activemq/concurrent/Mutex.h>
+#include <vector>
+#include <list>
+#include <map>
+
+namespace activemq{
+	
+	/**
+	 * Represents a pool of pending messages that have yet to be
+	 * read by message consumers.  Each consumer is given a session
+	 * into this pool.  When a new message comes in, a copy of it is given to
+	 * each session.
+	 * 
+	 * @author Nathan Mittler
+	 */
+	class PendingMessagePool : public cms::MessageListener
+	{
+	public:
+	
+		/**
+		 * Default constructor.
+		 */
+		PendingMessagePool();
+		
+		/**
+		 * Destructor - destroys any remaining session queues.
+		 */
+		virtual ~PendingMessagePool();
+		
+		/**
+		 * Adds a new session.
+		 * @return 	The id of the added session.
+		 */
+		virtual unsigned int addSession();
+		
+		/**
+		 * Removes a session and destroys any pending messages for
+		 * that session.
+		 * @param sessionId The id of the session to be removed.
+		 */
+		virtual void removeSession( const unsigned int sessionId );
+		
+		/**
+		 * Reads the next message on the queue for the given session.  This is
+		 * a destructive read - the message will be popped from the queue.
+		 * @param sessionId The id of the session.
+		 * @return The next message for the session.  NULL if no messages exist.
+		 */
+		virtual cms::Message* popNextPendingMessage(
+			const unsigned int sessionId );
+			
+		/**
+		 * Invoked to add a new message to the pool
+		 * @param msg The new message to be added.
+		 */
+		virtual void onMessage( const cms::Message* msg );
+		
+	private:
+	
+		/**
+		 * The vector of session queues.  Each session is given a std::pair where
+		 * the first is a flag where true means the session is active.  The second
+		 * is the queue of messages for that session.  
+		 */
+		std::vector< std::pair< bool, std::list<cms::Message*> > > sessionQueues;
+		
+		/**
+		 * Synchronization mechanism.
+		 */
+		concurrent::Mutex mutex;
+	};
+
+}
+
+#endif /*ACTIVEMQ_PENDINGMESSAGEPOOL_H_*/

Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/PendingMessagePool.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/PendingMessagePool.h
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Added: incubator/activemq/trunk/cms/activemqcms/src/activemq/PendingMessagePoolSession.h
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/cms/activemqcms/src/activemq/PendingMessagePoolSession.h?rev=382028&view=auto
==============================================================================
--- incubator/activemq/trunk/cms/activemqcms/src/activemq/PendingMessagePoolSession.h (added)
+++ incubator/activemq/trunk/cms/activemqcms/src/activemq/PendingMessagePoolSession.h Wed Mar  1 06:27:46 2006
@@ -0,0 +1,74 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ACTIVEMQ_PENDINGMESSAGEPOOLSESSION_H_
+#define ACTIVEMQ_PENDINGMESSAGEPOOLSESSION_H_
+
+#include <activemq/PendingMessagePool.h>
+
+namespace activemq{
+	
+	/**
+	 * This class represents a single session for a given
+	 * <code>PendingMessagePool</code> object. This class automates
+	 * the session management to simplify the client code.
+	 * 
+	 * @author Nathan Mittler
+	 */
+	class PendingMessagePoolSession{
+	public:
+	
+		/**
+		 * Constructor - adds a session to the pool.
+		 */
+		PendingMessagePoolSession( PendingMessagePool* pool ){
+			this->pool = pool;
+			sessionId = pool->addSession();
+		}
+		
+		/**
+		 * Destructor - removes this session from the pool.
+		 */
+		virtual ~PendingMessagePoolSession(){
+			pool->removeSession( sessionId );
+		}
+		
+		/**
+		 * Reads the next message for this session.  This is a destructive
+		 * read.
+		 */
+		cms::Message* popNextPendingMessage(){
+			
+			return pool->popNextPendingMessage( sessionId );
+		}
+		
+	private:
+	
+		/**
+		 * The message pool.
+		 */
+		PendingMessagePool* pool;
+		
+		/**
+		 * The unique identifier for this session.
+		 */
+		unsigned int sessionId;
+	};
+	
+}
+
+#endif /*ACTIVEMQ_PENDINGMESSAGEPOOLSESSION_H_*/

Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/PendingMessagePoolSession.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/PendingMessagePoolSession.h
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Added: incubator/activemq/trunk/cms/activemqcms/src/activemq/concurrent/Lock.h
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/cms/activemqcms/src/activemq/concurrent/Lock.h?rev=382028&view=auto
==============================================================================
--- incubator/activemq/trunk/cms/activemqcms/src/activemq/concurrent/Lock.h (added)
+++ incubator/activemq/trunk/cms/activemqcms/src/activemq/concurrent/Lock.h Wed Mar  1 06:27:46 2006
@@ -0,0 +1,111 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ACTIVEMQ_CONCURRENT_LOCK_H
+#define ACTIVEMQ_CONCURRENT_LOCK_H
+
+// Includes.
+#include <activemq/concurrent/Synchronizable.h>
+
+namespace activemq{
+namespace concurrent{
+    
+  /**
+   * A wrapper class around a given synchronization mechanism that
+   * provides automatic release upon destruction.
+   * @author  Nathan Mittler
+   */
+  class Lock
+  {
+  public:        // Interface
+
+    /**
+     * Constructor - initializes the object member and locks
+     * the object if desired.
+     * @param   object   The sync object to control
+     * @param   intiallyLocked  If true, the object will automatically
+     *                be locked.
+     * @see  SyncObject
+     */
+	Lock( Synchronizable* object, const bool intiallyLocked = true )
+	{
+		syncObject = object;
+		locked = false;
+		
+		if( intiallyLocked )
+	    {
+	      lock();
+	    }
+	}
+
+    /**
+     * Destructor - Unlocks the object if it is locked.
+     */
+    virtual ~Lock()
+    {
+	    if( locked )
+	    {
+	      syncObject->unlock();
+	    }
+  	}
+
+    /**
+     * Locks the object.
+     * @return  true if the lock was successful, otherwise false.
+     */
+    bool lock()
+    {
+	    locked = syncObject->lock();
+	    return locked;
+	}
+
+    /**
+     * Unlocks the object.
+     * @return  true if the unlock was successful, otherwise false.
+     */
+    bool unlock()
+    {
+	    syncObject->unlock();
+	    locked = false;
+	
+	    return locked;
+  	}
+
+    /**
+     * Indicates whether or not the object is locked.
+     * @return  true if the object is locked, otherwise false.
+     */
+    bool isLocked() const{ return locked; } 
+
+  private:       // Data
+
+    /**
+     * Flag to indicate whether or not this object has locked the
+     * sync object.
+     */
+    bool locked;
+
+    /**
+     * The synchronizable object to lock/unlock.
+     */
+    Synchronizable* syncObject;
+
+  };
+
+}}
+
+#endif // ACTIVEMQ_CONCURRENT_LOCK_H

Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/concurrent/Lock.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/concurrent/Lock.h
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Added: incubator/activemq/trunk/cms/activemqcms/src/activemq/concurrent/Mutex.h
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/cms/activemqcms/src/activemq/concurrent/Mutex.h?rev=382028&view=auto
==============================================================================
--- incubator/activemq/trunk/cms/activemqcms/src/activemq/concurrent/Mutex.h (added)
+++ incubator/activemq/trunk/cms/activemqcms/src/activemq/concurrent/Mutex.h Wed Mar  1 06:27:46 2006
@@ -0,0 +1,102 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ACTIVEMQ_CONCURRENT_MUTEX_H
+#define ACTIVEMQ_CONCURRENT_MUTEX_H
+
+// Includes.
+#include <activemq/concurrent/Synchronizable.h>
+#include <pthread.h>
+
+namespace activemq{
+namespace concurrent{
+    
+  /**
+   * Creates a pthread_mutex_t object. The object is created
+   * such that successive locks from the same thread is allowed
+   * and will be successful.
+   * @author  Nathan Mittler
+   * @see  pthread_mutex_t
+   */
+  class Mutex : public Synchronizable
+  {
+  public:
+
+    /**
+     * Constructor - creates and initializes the mutex.
+     */
+    Mutex()
+    {
+	    // Create an attributes object and initialize it.
+	    // Assign the recursive attribute so that the same thread may
+	    // lock this mutex repeatedly.
+	    pthread_mutexattr_t attributes;
+	    pthread_mutexattr_init( &attributes );
+
+            #ifdef __USE_UNIX98
+	        pthread_mutexattr_settype( &attributes, PTHREAD_MUTEX_RECURSIVE );
+            #endif
+
+	    // Initialize the mutex.
+	    pthread_mutex_init( &mutex, &attributes );
+	
+	    // Destroy the attributes.
+	    pthread_mutexattr_destroy( &attributes );
+  	}
+
+    /**
+     * Destructor - destroys the mutex object.
+     */
+    virtual ~Mutex()
+    {
+	    // Unlock the mutex.
+	    unlock();
+	
+	    // Destroy the mutex.
+	    pthread_mutex_destroy( &mutex );
+  	}
+
+    /**
+     * Locks the object.
+     * @return  true if the lock was successful, otherwise false.
+     */
+    virtual bool lock()
+    {
+    	return pthread_mutex_lock( &mutex ) == 0;
+  	}
+
+    /**
+     * Unlocks the object.
+     * @return  true if the unlock was successful, otherwise false.
+     */
+    virtual bool unlock()
+    {
+    	return pthread_mutex_unlock( &mutex ) == 0;
+  	}
+
+  private:       // Data
+
+    /**
+     * The mutex object.
+     */
+    pthread_mutex_t mutex;
+
+  };
+
+}}
+
+#endif // ACTIVEMQ_CONCURRENT_MUTEX_H

Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/concurrent/Mutex.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/concurrent/Mutex.h
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Added: incubator/activemq/trunk/cms/activemqcms/src/activemq/concurrent/Synchronizable.h
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/cms/activemqcms/src/activemq/concurrent/Synchronizable.h?rev=382028&view=auto
==============================================================================
--- incubator/activemq/trunk/cms/activemqcms/src/activemq/concurrent/Synchronizable.h (added)
+++ incubator/activemq/trunk/cms/activemqcms/src/activemq/concurrent/Synchronizable.h Wed Mar  1 06:27:46 2006
@@ -0,0 +1,51 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ACTIVEMQ_CONCURRENT_SYNCHRONIZABLE_H
+#define ACTIVEMQ_CONCURRENT_SYNCHRONIZABLE_H
+
+namespace activemq{
+namespace concurrent{
+    
+  /**
+   * The interface for all synchronizable objects (that is, objects
+   * that can be locked and unlocked).
+   * @author Nathan Mittler
+   */
+  class Synchronizable
+  {
+  public:        // Abstract Interface
+
+	virtual ~Synchronizable(){}
+	
+    /**
+     * Locks the object.
+     * @return  true if the lock was successful, otherwise false.
+     */
+    virtual bool lock() = 0;
+
+    /**
+     * Unlocks the object.
+     * @return  true if the unlock was successful, otherwise false.
+     */
+    virtual bool unlock() = 0;
+
+  }; 
+
+}}
+
+#endif /*ACTIVEMQ_CONCURRENT_SYNCHRONIZABLE_H*/

Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/concurrent/Synchronizable.h
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/concurrent/Synchronizable.h
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL

Added: incubator/activemq/trunk/cms/activemqcms/src/activemq/io/BufferedInputStream.cpp
URL: http://svn.apache.org/viewcvs/incubator/activemq/trunk/cms/activemqcms/src/activemq/io/BufferedInputStream.cpp?rev=382028&view=auto
==============================================================================
--- incubator/activemq/trunk/cms/activemqcms/src/activemq/io/BufferedInputStream.cpp (added)
+++ incubator/activemq/trunk/cms/activemqcms/src/activemq/io/BufferedInputStream.cpp Wed Mar  1 06:27:46 2006
@@ -0,0 +1,145 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+#include "BufferedInputStream.h"
+#include <algorithm>
+
+using namespace activemq::io;
+using namespace std;
+
+////////////////////////////////////////////////////////////////////////////////
+BufferedInputStream::BufferedInputStream( InputStream* stream )
+{
+	// Default to a 1k buffer.
+	init( stream, 1024 );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+BufferedInputStream::BufferedInputStream( InputStream* stream, 
+	const int bufferSize )
+{
+	init( stream, bufferSize );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+BufferedInputStream::~BufferedInputStream()
+{
+    // Destroy the buffer.
+    if( buffer != NULL ){
+        delete [] buffer;
+        buffer = NULL;
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void BufferedInputStream::init( InputStream* stream, const int bufferSize ){
+	
+	this->stream = stream;
+	this->bufferSize = bufferSize;
+	
+	// Create the buffer and initialize the head and tail positions.
+	buffer = new char[bufferSize];
+	head = 0;
+	tail = 0;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void BufferedInputStream::close() throw(cms::CMSException){
+	
+	// Close the delegate stream.
+	stream->close();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+char BufferedInputStream::read() throw (ActiveMQException){
+	
+	// If we don't have any data buffered yet - read as much as we can.	
+	if( tail == head ){
+		bufferData();
+	}
+	
+	// Get the next character.
+	char returnValue = buffer[head++];
+	
+	// If the buffer is now empty - reset it to the beginning of the buffer.
+	if( tail == head ){
+		tail = head = 0;
+	}
+	
+	return returnValue;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+int BufferedInputStream::read( char* buffer, 
+	const int bufferSize ) throw (ActiveMQException){
+		
+	int totalRead = 0;
+	
+	// Get the number of bytes that can be copied directly from
+	// the buffer.
+	int bytesToCopy = min( tail-head, bufferSize );
+	
+	// Copy the data to the output buffer.	
+	memcpy( buffer, this->buffer+head, bytesToCopy );
+	
+	// Increment the total bytes read.
+	totalRead += bytesToCopy;
+	
+	// Increment the head position.  If the buffer is now empty,
+	// reset the positions.
+	head += bytesToCopy;
+	if( head == tail ){
+		head = tail = 0;
+	}
+	
+	// If we still haven't filled the output buffer, read a buffer's
+	// worth from the stream.
+	if( bytesToCopy < bufferSize ){
+		
+		// Buffer as much data as we can.
+		bufferData();
+		
+		// Get the remaining bytes to copy.
+		bytesToCopy = min( tail-head, (bufferSize-bytesToCopy) );
+		
+		// Copy the data to the output buffer.	
+		memcpy( buffer+totalRead, this->buffer+head, bytesToCopy );
+		
+		// Increment the total bytes read.
+		totalRead += bytesToCopy;
+	}
+	
+	// Return the total number of bytes read.
+	return totalRead;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void BufferedInputStream::bufferData() throw (ActiveMQException){
+	
+	if( tail == bufferSize ){
+		throw ActiveMQException( "BufferedInputStream::bufferData - buffer full" );
+	}
+	
+	// Read in as many bytes as we can.
+	int bytesRead = stream->read( buffer+tail, bufferSize-tail );
+	if( bytesRead == 0 ){
+		throw ActiveMQException("BufferedInputStream::read() - failed reading bytes from stream");
+	}
+	
+	// Increment the tail to the new end position.
+	tail += bytesRead;
+}

Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/io/BufferedInputStream.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/activemq/trunk/cms/activemqcms/src/activemq/io/BufferedInputStream.cpp
------------------------------------------------------------------------------
    svn:keywords = Date Author Id Revision HeadURL



Mime
View raw message