activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From nmitt...@apache.org
Subject svn commit: r418749 [3/17] - in /incubator/activemq/trunk/activemq-cpp: ./ src/ src/main/ src/main/activemq/ src/main/activemq/concurrent/ src/main/activemq/connector/ src/main/activemq/connector/openwire/ src/main/activemq/connector/stomp/ src/main/ac...
Date Mon, 03 Jul 2006 11:51:54 GMT
Added: incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompCommandReader.cpp
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompCommandReader.cpp?rev=418749&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompCommandReader.cpp (added)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompCommandReader.cpp Mon Jul  3 04:51:36 2006
@@ -0,0 +1,325 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "StompCommandReader.h"
+
+#include <activemq/connector/stomp/commands/CommandConstants.h>
+#include <activemq/concurrent/Thread.h>
+
+using namespace std;
+using namespace activemq;
+using namespace activemq::concurrent;
+using namespace activemq::connector;
+using namespace activemq::connector::stomp;
+using namespace activemq::transport;
+using namespace activemq::io;
+using namespace activemq::exceptions;
+
+////////////////////////////////////////////////////////////////////////////////
+StompCommandReader::StompCommandReader(void)
+{
+    inputStream = NULL;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+StompCommandReader::StompCommandReader(InputStream* is)
+{
+    inputStream = is;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+Command* StompCommandReader::readCommand(void) 
+    throw (CommandIOException)
+{
+    try
+    {
+        // Create a new Frame for reading to.
+        StompFrame* frame = new StompFrame();
+       
+        // Read the command into the frame.
+        readStompCommand( *frame );
+       
+        // Read the headers.
+        readStompHeaders( *frame );
+       
+        // Read the body.
+        readStompBody( *frame );
+       
+        // Return the Command, caller must delete it.
+        return marshaler.marshal( frame );
+    }
+    AMQ_CATCH_RETHROW( CommandIOException )
+    AMQ_CATCH_EXCEPTION_CONVERT( ActiveMQException, CommandIOException )
+    AMQ_CATCHALL_THROW( CommandIOException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void StompCommandReader::readStompCommand( StompFrame& frame ) 
+   throw ( StompConnectorException )
+{  
+    // Read the command;
+    int numChars = readStompHeaderLine();
+
+    if( numChars <= 0 )
+    {
+        throw StompConnectorException(
+            __FILE__, __LINE__,
+            "StompCommandReader::readStompCommand: "
+            "Error on Read of Command Header" );
+    }
+
+    // Set the command in the frame - copy the memory.
+    frame.setCommand( reinterpret_cast<char*>(&buffer[0]) );
+
+    // Clean up the mess.
+    buffer.clear();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void StompCommandReader::readStompHeaders( StompFrame& frame ) 
+   throw (StompConnectorException)
+{
+    // Read the command;
+    bool endOfHeaders = false;
+
+    while( !endOfHeaders )
+    {
+        // Clean up the mess.
+        buffer.clear();
+
+        // Read in the next header line.
+        int numChars = readStompHeaderLine();
+
+        if( numChars == 0 )
+        {
+            // should never get here
+            throw StompConnectorException(
+                __FILE__, __LINE__,
+                "StompCommandReader::readStompHeaders: no characters read" );
+        }
+      
+        // Check for an empty line to demark the end of the header section.
+        // if its not the end then we have a header to process, so parse it.
+        if( numChars == 1 && buffer[0] == '\0' )
+        {
+            endOfHeaders = true;
+        }
+        else
+        {
+            // Search through this line to separate the key/value pair.
+            for( size_t ix = 0; ix < buffer.size(); ++ix )
+            {
+                // If found the key/value separator...
+                if( buffer[ix] == ':' )
+                {
+                    // Null-terminate the key.
+                    buffer[ix] = '\0'; 
+
+                    const char* key = reinterpret_cast<char*>(&buffer[0]);
+                    const char* value = reinterpret_cast<char*>(&buffer[ix+1]);
+               
+                    // Assign the header key/value pair.
+                    frame.getProperties().setProperty(key, value);
+               
+                    // Break out of the for loop.
+                    break;
+                }
+            }
+        }
+    }
+
+    // Clean up the mess.
+    buffer.clear();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+int StompCommandReader::readStompHeaderLine(void) 
+    throw (StompConnectorException)
+{
+    int count = 0;
+  
+    while( true )
+    {
+        // Read the next char from the stream.
+        buffer.push_back( inputStream->read() );
+      
+        // Increment the position pointer.
+        count++;
+      
+        // If we reached the line terminator, return the total number
+        // of characters read.
+        if( buffer[count-1] == '\n' )
+        {
+            // Overwrite the line feed with a null character. 
+            buffer[count-1] = '\0';
+         
+            return count;
+        }
+    }
+   
+    // If we get here something bad must have happened.
+    throw StompConnectorException(
+        __FILE__, __LINE__,
+        "StompCommandReader::readStompHeaderLine: "
+        "Unrecoverable, error condition");
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void StompCommandReader::readStompBody( StompFrame& frame ) 
+   throw ( StompConnectorException )
+{
+    unsigned long content_length = 0;
+   
+    if(frame.getProperties().hasProperty(
+        commands::CommandConstants::toString(
+            commands::CommandConstants::HEADER_CONTENTLENGTH)))
+    {
+        char* stopped_string = NULL;
+      
+        string length = 
+            frame.getProperties().getProperty(
+                commands::CommandConstants::toString(
+                    commands::CommandConstants::HEADER_CONTENTLENGTH));
+            
+        content_length = strtoul(
+            length.c_str(), 
+            &stopped_string, 
+            10);
+     }
+
+     if(content_length != 0)
+     {
+        // For this case its assumed that content length indicates how 
+        // much to read.  We reserve space in the buffer for it to 
+        // minimize the number of reallocs that might occur.  We are
+        // assuming that content length doesn't count the trailing null
+        // that indicates the end of frame.  The reserve won't do anything
+        // if the buffer already has that much capacity.  The resize call
+        // basically sets the end iterator to the correct location since
+        // this is a char vector and we already reserve enough space.
+        // Resize doesn't realloc the vector smaller if content_length
+        // is less than capacity of the buffer, it just move the end
+        // iterator.  Reserve adds the benefit that the mem is set to 
+        // zero.  Over time as larger messages come in thsi will cause
+        // us to adapt to that size so that future messages that are
+        // around that size won't alloc any new memory.
+
+        buffer.reserve( content_length );
+        buffer.resize( content_length );
+
+        // Read the Content Length now
+        read( &buffer[0], content_length );
+
+        // Content Length read, now pop the end terminator off (\0\n).
+        if(inputStream->read() != '\0' ||
+           inputStream->read() != '\n')
+        {
+            throw StompConnectorException(
+                __FILE__, __LINE__,
+                "StompCommandReader::readStompBody: "
+                "Read Content Length, and no trailing null");
+        }
+    }
+    else
+    {
+        // Content length was either zero, or not set, so we read until the
+        // first null is encounted.
+      
+        while( true )
+        {
+            char byte = inputStream->read();
+         
+            buffer.push_back(byte);
+        
+            content_length++;
+
+            if(byte != '\0')
+            {            
+                continue;
+            }
+
+            // We read up to the first NULL, now lets pop off the required
+            // newline to complete the packet.
+            if(inputStream->read() != '\n')
+            {
+                throw StompConnectorException(
+                    __FILE__, __LINE__,
+                    "StompCommandReader::readStompBody: "
+                    "Read Body, and no trailing newline");
+            }
+
+            break;  // Read null and newline we are done.
+        }
+    }
+
+    if( content_length != 0 )
+    {
+        char* cpyBody = new char[content_length];
+        memcpy(cpyBody, &buffer[0], content_length);
+
+        // Set the body contents in the frame - copy the memory
+        frame.setBody( cpyBody, content_length );
+    }
+
+    // Clean up the mess.
+    buffer.clear();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+int StompCommandReader::read(unsigned char* buffer, int count) 
+   throw(io::IOException)
+{
+    if( inputStream == NULL )
+    {
+        throw IOException( 
+            __FILE__, __LINE__, 
+            "StompCommandReader::read(char*,int) - input stream is NULL" );
+    }
+   
+    int head = 0;
+   
+    // We call the read(buffer, size) version asking for one
+    // byte, if this returns zero, then there wasn't anything 
+    // on the stream to read, so we try again after a short 
+    // pause in hopes that some more data will show up.
+    while( true )
+    {
+        head += inputStream->read(&buffer[head], count - head);
+      
+        if(head == count)
+        {
+            return count;
+        }
+      
+        // Got here, so we wait a bit and try again.
+        Thread::sleep( 10 );
+    }
+}
+ 
+////////////////////////////////////////////////////////////////////////////////
+unsigned char StompCommandReader::readByte(void) throw(io::IOException)
+{
+    if( inputStream == NULL )
+    {
+        throw IOException( 
+            __FILE__, __LINE__, 
+            "StompCommandReader::read(char*,int) - input stream is NULL" );
+    }
+   
+    unsigned char c = 0;
+    inputStream->read(&c, 1);
+    return c;
+}

Added: incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompCommandReader.h
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompCommandReader.h?rev=418749&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompCommandReader.h (added)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompCommandReader.h Mon Jul  3 04:51:36 2006
@@ -0,0 +1,146 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef _ACTIVEMQ_CONNECTOR_STOMP_STOMPCOMMANDREADER_H_
+#define _ACTIVEMQ_CONNECTOR_STOMP_STOMPCOMMANDREADER_H_
+
+#include <activemq/transport/CommandReader.h>
+#include <activemq/io/InputStream.h>
+#include <activemq/transport/CommandIOException.h>
+#include <activemq/transport/Command.h>
+#include <activemq/connector/stomp/StompFrame.h>
+#include <activemq/connector/stomp/StompConnectorException.h>
+#include <activemq/connector/stomp/marshal/Marshaler.h>
+
+namespace activemq{
+namespace connector{
+namespace stomp{
+
+    class StompCommandReader : public transport::CommandReader
+    {
+    private:
+   
+        /**
+         * The target input stream.
+         */
+        io::InputStream* inputStream;
+      
+        /**
+         * Vector Object used to buffer data
+         */
+        std::vector<unsigned char> buffer;
+        
+        /**
+         * Marshaler of Stomp Commands
+         */
+        marshal::Marshaler marshaler;
+
+    public:
+
+        /**
+         * Deafult Constructor
+         */
+    	StompCommandReader( void );
+
+        /**
+         * Constructor.
+         * @param is the target input stream.
+         */
+        StompCommandReader( io::InputStream* is );
+
+        /**
+         * Destructor
+         */
+    	virtual ~StompCommandReader(void) {}
+
+        /**
+         * Reads a command from the given input stream.
+         * @return The next command available on the stream.
+         * @throws CommandIOException if a problem occurs during the read.
+         */
+        virtual transport::Command* readCommand( void ) 
+            throw ( transport::CommandIOException );
+
+        /**
+         * Sets the target input stream.
+         * @param Target Input Stream
+         */
+        virtual void setInputStream(io::InputStream* is){
+            inputStream = is;
+        }
+      
+        /**
+         * Gets the target input stream.
+         * @return Target Input Stream
+         */
+        virtual io::InputStream* getInputStream( void ){
+            return inputStream;
+        }
+
+        /**
+         * Attempts to read an array of bytes from the stream.
+         * @param buffer The target byte buffer.
+         * @param count The number of bytes to read.
+         * @return The number of bytes read.
+         * @throws IOException thrown if an error occurs.
+         */
+        virtual int read(unsigned char* buffer, int count) 
+            throw( io::IOException );
+       
+        /**
+         * Attempts to read a byte from the input stream
+         * @return The byte.
+         * @throws IOException thrown if an error occurs.
+         */
+        virtual unsigned char readByte(void) throw( io::IOException );
+
+    private:
+    
+        /**
+         * Read the Stomp Command from the Frame
+         * @param reference to a Stomp Frame
+         * @throws StompConnectorException
+         */
+        void readStompCommand( StompFrame& frame ) 
+            throw ( StompConnectorException );
+
+        /** 
+         * Read all the Stomp Headers for the incoming Frame
+         * @param Frame to place data into
+         * @throws StompConnectorException
+         */
+        void readStompHeaders( StompFrame& frame ) 
+            throw ( StompConnectorException );
+
+        /**
+         * Reads a Stomp Header line and stores it in the buffer object
+         * @return number of bytes read, zero if there was a problem.
+         * @throws StompConnectorException
+         */
+        int readStompHeaderLine( void ) throw ( StompConnectorException );
+
+        /**
+         * Reads the Stomp Body from the Wire and store it in the frame.
+         * @param Stomp Frame to place data in
+         */
+        void readStompBody( StompFrame& frame ) 
+            throw ( StompConnectorException );
+    
+    };
+
+}}}
+
+#endif /*_ACTIVEMQ_CONNECTOR_STOMP_STOMPCOMMANDREADER_H_*/

Added: incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompCommandWriter.cpp
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompCommandWriter.cpp?rev=418749&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompCommandWriter.cpp (added)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompCommandWriter.cpp Mon Jul  3 04:51:36 2006
@@ -0,0 +1,137 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "StompCommandWriter.h"
+
+#include <activemq/connector/stomp/StompFrame.h>
+#include <activemq/connector/stomp/commands/CommandConstants.h>
+
+using namespace std;
+using namespace activemq;
+using namespace activemq::connector;
+using namespace activemq::connector::stomp;
+using namespace activemq::connector::stomp::commands;
+using namespace activemq::transport;
+using namespace activemq::io;
+using namespace activemq::exceptions;
+
+////////////////////////////////////////////////////////////////////////////////
+StompCommandWriter::StompCommandWriter(void)
+{
+    outputStream = NULL;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+StompCommandWriter::StompCommandWriter(OutputStream* os)
+{
+    outputStream = os;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void StompCommandWriter::writeCommand( const Command* command ) 
+    throw ( transport::CommandIOException )
+{
+    try
+    {
+        if( outputStream == NULL )
+        {
+            throw CommandIOException( 
+                __FILE__, __LINE__, 
+                "StompCommandWriter::writeCommand - "
+                "output stream is NULL" );
+        }
+
+        const StompFrame& frame = marshaler.marshal( command );
+
+        // Write the command.
+        const string& cmdString = frame.getCommand();
+        write( cmdString.c_str(), cmdString.length() );
+        writeByte( '\n' );
+
+        // Write all the headers.
+        vector< pair<string,string> > headers = frame.getProperties().toArray();   
+        for( unsigned int ix=0; ix < headers.size(); ++ix )
+        {
+            string& name = headers[ix].first;
+            string& value = headers[ix].second;
+
+            write( name.c_str(), name.length() );
+            writeByte( ':' );
+            write( value.c_str(), value.length() );
+            writeByte( '\n' );       
+        }
+
+        // Finish the header section with a form feed.
+        writeByte( '\n' );
+
+        // Write the body.
+        const char* body = frame.getBody();
+        if( body != NULL ) 
+        {
+            write( body, frame.getBodyLength() );
+        }
+
+        if( ( frame.getBodyLength() == 0 ) ||
+            ( frame.getProperties().getProperty( 
+                  CommandConstants::toString( 
+                      CommandConstants::HEADER_CONTENTLENGTH ), "" ) != "" ) )
+        {
+            writeByte( '\0' );
+        }
+
+        writeByte( '\n' );
+
+        // Flush the stream.
+        outputStream->flush();
+    }
+    AMQ_CATCH_RETHROW( CommandIOException )
+    AMQ_CATCH_EXCEPTION_CONVERT( ActiveMQException, CommandIOException )
+    AMQ_CATCHALL_THROW( CommandIOException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void StompCommandWriter::write(const unsigned char* buffer, int count) 
+    throw(IOException)
+{
+    if( outputStream == NULL )
+    {
+        throw IOException( 
+            __FILE__, __LINE__, 
+            "StompCommandWriter::write(char*,int) - input stream is NULL" );
+    }
+
+    outputStream->write( buffer, count );
+}
+ 
+////////////////////////////////////////////////////////////////////////////////
+void StompCommandWriter::writeByte(unsigned char v) throw(IOException)
+{
+    if( outputStream == NULL )
+    {
+        throw IOException( 
+            __FILE__, __LINE__, 
+            "StompCommandWriter::write(char) - input stream is NULL" );
+    }
+   
+    outputStream->write( v );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void StompCommandWriter::write(const char* buffer, int count) 
+   throw(io::IOException)
+{
+    write(reinterpret_cast<const unsigned char*>(buffer), count);
+}

Added: incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompCommandWriter.h
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompCommandWriter.h?rev=418749&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompCommandWriter.h (added)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompCommandWriter.h Mon Jul  3 04:51:36 2006
@@ -0,0 +1,118 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef _ACTIVEMQ_CONNECTOR_STOMP_STOMPCOMMANDWRITER_H_
+#define _ACTIVEMQ_CONNECTOR_STOMP_STOMPCOMMANDWRITER_H_
+
+#include <activemq/transport/CommandWriter.h>
+#include <activemq/io/InputStream.h>
+#include <activemq/transport/CommandIOException.h>
+#include <activemq/connector/stomp/StompConnectorException.h>
+#include <activemq/transport/Command.h>
+#include <activemq/io/OutputStream.h>
+#include <activemq/connector/stomp/marshal/Marshaler.h>
+
+namespace activemq{
+namespace connector{
+namespace stomp{
+
+    class StompCommandWriter : public transport::CommandWriter
+    {
+    private:
+    
+        /**
+         * Target output stream.
+         */
+        io::OutputStream* outputStream;
+
+        /**
+         * Marshaler of Stomp Commands
+         */
+        marshal::Marshaler marshaler;
+
+    public:
+    
+        /**
+         * Default Constructor
+         */
+    	StompCommandWriter(void);
+
+        /**
+         * Constructor.
+         * @param os the target output stream.
+         */
+        StompCommandWriter( io::OutputStream* os );
+
+        /**
+         * Destructor
+         */
+    	virtual ~StompCommandWriter(void) {}
+
+        /**
+         * Sets the target output stream.
+         */
+        virtual void setOutputStream(io::OutputStream* os){
+            outputStream = os;
+        }
+      
+        /**
+         * Gets the target output stream.
+         */
+        virtual io::OutputStream* getOutputStream(void){
+            return outputStream;
+        }
+
+        /**
+         * Writes a command to the given output stream.
+         * @param command the command to write.
+         * @param os the target stream for the write.
+         * @throws CommandIOException if a problem occurs during the write.
+         */
+        virtual void writeCommand( const transport::Command* command ) 
+            throw ( transport::CommandIOException );
+
+        /**
+         * Writes a byte array to the output stream.
+         * @param buffer a byte array
+         * @param count the number of bytes in the array to write.
+         * @throws IOException thrown if an error occurs.
+         */
+        virtual void write(const unsigned char* buffer, int count) 
+            throw( io::IOException );
+       
+        /**
+         * Writes a byte to the output stream.
+         * @param v The value to be written.
+         * @throws IOException thrown if an error occurs.
+         */
+        virtual void writeByte(unsigned char v) throw( io::IOException );
+
+    private:
+   
+        /**
+         * Writes a char array to the output stream.
+         * @param buffer a char array
+         * @param count the number of bytes in the array to write.
+         * @throws IOException thrown if an error occurs.
+         */
+        virtual void write(const char* buffer, int count) 
+            throw( io::IOException );
+
+    };
+
+}}}
+
+#endif /*_ACTIVEMQ_CONNECTOR_STOMP_STOMPCOMMANDWRITER_H_*/

Added: incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompConnector.cpp
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompConnector.cpp?rev=418749&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompConnector.cpp (added)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompConnector.cpp Mon Jul  3 04:51:36 2006
@@ -0,0 +1,795 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+#include <activemq/connector/stomp/StompConnector.h>
+#include <activemq/concurrent/Concurrent.h>
+#include <activemq/transport/BrokerError.h>
+#include <activemq/transport/Transport.h>
+#include <activemq/transport/ExceptionResponse.h>
+#include <activemq/connector/stomp/StompTopic.h>
+#include <activemq/connector/stomp/StompQueue.h>
+#include <activemq/connector/stomp/commands/ConnectCommand.h>
+#include <activemq/connector/stomp/commands/ErrorCommand.h>
+#include <activemq/connector/stomp/commands/BeginCommand.h>
+#include <activemq/connector/stomp/commands/AbortCommand.h>
+#include <activemq/connector/stomp/commands/AckCommand.h>
+#include <activemq/connector/stomp/commands/CommitCommand.h>
+#include <activemq/connector/stomp/commands/MessageCommand.h>
+#include <activemq/connector/stomp/commands/BytesMessageCommand.h>
+#include <activemq/connector/stomp/commands/TextMessageCommand.h>
+#include <activemq/connector/stomp/commands/ConnectedCommand.h>
+#include <activemq/connector/stomp/commands/DisconnectCommand.h>
+#include <activemq/exceptions/UnsupportedOperationException.h>
+#include <activemq/connector/stomp/StompProducerInfo.h>
+#include <activemq/connector/stomp/StompTransactionInfo.h>
+#include <activemq/util/Integer.h>
+
+using namespace std;
+using namespace activemq;
+using namespace activemq::connector;
+using namespace activemq::util;
+using namespace activemq::transport;
+using namespace activemq::exceptions;
+using namespace activemq::connector::stomp;
+using namespace activemq::connector::stomp::commands;
+
+////////////////////////////////////////////////////////////////////////////////
+StompConnector::StompConnector( Transport* transport, 
+                                const util::Properties& properties )
+    throw ( IllegalArgumentException )
+{
+    if(transport == NULL)
+    {
+        throw IllegalArgumentException(
+            __FILE__, __LINE__,
+            "StompConnector::StompConnector - Transport cannot be NULL");
+    }
+    
+    this->transport = transport;
+    this->state = DISCONNECTED;
+    this->exceptionListener = NULL;
+    this->messageListener = NULL;
+    this->sessionManager = NULL;
+    this->nextProducerId = 0;
+    this->nextTransactionId = 0;
+    this->properties.copy( &properties );
+    
+    // Observe the transport for events.
+    this->transport->setCommandListener( this );
+    this->transport->setTransportExceptionListener( this );
+
+    // Setup the reader and writer in the transport.
+    this->transport->setCommandReader( &reader );
+    this->transport->setCommandWriter( &writer );
+
+    // Register ourself for those commands that we process    
+    addCmdListener( CommandConstants::ERROR_CMD, this );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+StompConnector::~StompConnector(void)
+{
+    try
+    {
+        close();
+        
+        delete sessionManager;
+    }
+    AMQ_CATCH_NOTHROW( ActiveMQException )
+    AMQ_CATCHALL_NOTHROW( )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+unsigned int StompConnector::getNextProducerId(void)
+{
+    synchronized(&mutex)
+    {
+        return nextProducerId++;
+    }
+    
+    return 0;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+unsigned int StompConnector::getNextTransactionId(void)
+{
+    synchronized(&mutex)
+    {
+        return nextTransactionId++;
+    }
+    
+    return 0;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void StompConnector::enforceConnected( void ) throw ( ConnectorException )
+{
+    if( state != CONNECTED )
+    {
+        throw StompConnectorException(
+            __FILE__, __LINE__,
+            "StompConnector::enforceConnected - Not Connected!" );
+    }
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void StompConnector::addCmdListener( 
+    commands::CommandConstants::CommandId commandId,
+    StompCommandListener* listener )
+{
+    cmdListenerMap.insert( make_pair( commandId, listener ) );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void StompConnector::removeCmdListener( 
+    commands::CommandConstants::CommandId commandId )
+{
+    cmdListenerMap.erase(commandId);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void StompConnector::start(void) throw( cms::CMSException )
+{
+    try
+    {
+        synchronized( &mutex )
+        {
+            if( state == CONNECTED )
+            {
+                throw ActiveMQException( 
+                    __FILE__, __LINE__, 
+                    "StompConnector::start - already started" );
+            }
+                
+            // Start the transport - this establishes the socket.
+            transport->start();
+
+            // Send the connect message to the broker.
+            connect();         
+        }        
+    }
+    AMQ_CATCH_RETHROW( ActiveMQException )
+    AMQ_CATCHALL_THROW( ActiveMQException );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void StompConnector::close(void) throw( cms::CMSException ){
+    
+    try
+    {
+        synchronized( &mutex )
+        {  
+            if( state == this->CONNECTED )
+            {
+                // Send the disconnect message to the broker.
+                disconnect();
+
+                // Close the transport.
+                transport->close();
+            }
+        }
+    }
+    AMQ_CATCH_RETHROW( ActiveMQException )
+    AMQ_CATCHALL_THROW( ActiveMQException );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void StompConnector::connect(void)
+{
+    try
+    {
+        // Mark this connector as started.
+        state = this->CONNECTING;
+
+        // Send the connect command to the broker 
+        ConnectCommand cmd;
+
+        // Encode User Name and Password and Client ID
+        string login = getLogin();
+        if( login.length() > 0 ){
+            cmd.setLogin( login );
+        }        
+        string password = getPassword();
+        if( password.length() > 0 ){
+            cmd.setPassword( password );
+        }        
+        string clientId = getClientId();
+        if( clientId.length() > 0 ){
+            cmd.setClientId( clientId );
+        }
+
+        Response* response = transport->request( &cmd );
+        
+        if( dynamic_cast< ExceptionResponse* >( response ) != NULL )
+        {
+            throw StompConnectorException(
+                __FILE__, __LINE__,
+                "StompConnector::connect - Failed on Connect Request" );
+        }
+
+        ConnectedCommand* connected = 
+            dynamic_cast< ConnectedCommand* >( response );
+
+        if( connected == NULL )
+        {
+            throw StompConnectorException(
+                __FILE__, __LINE__,
+                "StompConnector::connect - "
+                "Response not a connected response" );            
+        }
+
+        // Connected so we now create the SessionManager
+        sessionManager = new StompSessionManager(
+            connected->getSessionId(), transport );
+
+        // Give our message listener to the session manager it will
+        // notify all the interested clients                
+        sessionManager->setConsumerMessageListener( messageListener );
+
+        // Add the Session Manager as the Command Listener for 
+        // Message commands so that it can route them to the 
+        // correct consumers.
+        addCmdListener( CommandConstants::MESSAGE, sessionManager );
+        
+        // In Stomp, the client Id is the same as the session id that is
+        // returned in the Connected response
+        properties.setProperty( 
+            commands::CommandConstants::toString( 
+                commands::CommandConstants::HEADER_CLIENT_ID ),
+            connected->getSessionId() );        
+
+        // Tag us in the Connected State now.
+        state = CONNECTED;
+        
+        // Clean up
+        delete response;
+    }
+    AMQ_CATCH_RETHROW( BrokerError )
+    AMQ_CATCH_RETHROW( ActiveMQException )
+    AMQ_CATCHALL_THROW( ActiveMQException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void StompConnector::disconnect(void)
+{
+    try
+    {
+        // Mark state as no longer connected.
+        state = this->DISCONNECTED;
+
+        // Send the disconnect command to the broker.
+        DisconnectCommand cmd;
+        transport->oneway( &cmd );
+    }
+    AMQ_CATCH_RETHROW( ActiveMQException )
+    AMQ_CATCHALL_THROW( ActiveMQException );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+SessionInfo* StompConnector::createSession(
+    cms::Session::AcknowledgeMode ackMode) 
+        throw( ConnectorException )
+{
+    try
+    {
+        enforceConnected();
+        
+        return sessionManager->createSession( ackMode );
+    }
+    AMQ_CATCH_RETHROW( ConnectorException )
+    AMQ_CATCHALL_THROW( ConnectorException );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+ConsumerInfo* StompConnector::createConsumer(
+    cms::Destination* destination, 
+    SessionInfo* session,
+    const std::string& selector)
+        throw ( ConnectorException )
+{
+    try
+    {
+        enforceConnected();
+        
+        return sessionManager->createConsumer( 
+            destination, session, selector );
+    }
+    AMQ_CATCH_RETHROW( ConnectorException )
+    AMQ_CATCHALL_THROW( ConnectorException );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+ConsumerInfo* StompConnector::createDurableConsumer(
+    cms::Topic* topic, 
+    SessionInfo* session,
+    const std::string& name,
+    const std::string& selector,
+    bool noLocal)
+        throw ( ConnectorException )
+{
+    try
+    {
+        enforceConnected();
+        
+        return sessionManager->createDurableConsumer( 
+            topic, session, name, selector, noLocal );
+    }
+    AMQ_CATCH_RETHROW( ConnectorException )
+    AMQ_CATCHALL_THROW( ConnectorException );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+ProducerInfo* StompConnector::createProducer(
+    cms::Destination* destination, 
+    SessionInfo* session)
+        throw ( ConnectorException )
+{
+    try
+    {
+        enforceConnected();
+        
+        ProducerInfo* producer = new StompProducerInfo();
+        
+        producer->setDestination( *destination );
+        producer->setProducerId( getNextProducerId() );
+        producer->setSessionInfo( session );
+        
+        return producer;
+    }
+    AMQ_CATCH_RETHROW( ConnectorException )
+    AMQ_CATCHALL_THROW( ConnectorException );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+cms::Topic* StompConnector::createTopic(const std::string& name, 
+                                        SessionInfo* session)
+    throw ( ConnectorException )
+{
+    try
+    {
+        enforceConnected();
+        
+        return new StompTopic(name);
+    }
+    AMQ_CATCH_RETHROW( ConnectorException )
+    AMQ_CATCHALL_THROW( ConnectorException );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+cms::Queue* StompConnector::createQueue(const std::string& name, 
+                                        SessionInfo* session)
+    throw ( ConnectorException )
+{
+    try
+    {
+        enforceConnected();
+        
+        return new StompQueue(name);
+    }
+    AMQ_CATCH_RETHROW( ConnectorException )
+    AMQ_CATCHALL_THROW( ConnectorException );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+cms::TemporaryTopic* StompConnector::createTemporaryTopic(
+    SessionInfo* session)
+        throw ( ConnectorException )
+{
+    try
+    {
+        throw UnsupportedOperationException(
+            __FILE__, __LINE__, 
+            "StompConnector::createTemporaryTopic - No Stomp Support");
+    }
+    AMQ_CATCH_RETHROW( ConnectorException )
+    AMQ_CATCHALL_THROW( ConnectorException );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+cms::TemporaryQueue* StompConnector::createTemporaryQueue(
+    SessionInfo* session)
+        throw ( ConnectorException )
+{
+    try
+    {
+        throw UnsupportedOperationException(
+            __FILE__, __LINE__, 
+            "StompConnector::createTemporaryQueue - No Stomp Support");
+    }
+    AMQ_CATCH_RETHROW( ConnectorException )
+    AMQ_CATCHALL_THROW( ConnectorException );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void StompConnector::send(cms::Message* message, 
+                          ProducerInfo* producerInfo) 
+    throw ( ConnectorException )
+{
+    try
+    {
+        enforceConnected();
+        
+        const SessionInfo* session = producerInfo->getSessionInfo();
+        Command* command = dynamic_cast< transport::Command* >( message );
+        
+        if( command == NULL )
+        {
+            throw StompConnectorException(
+                __FILE__, __LINE__,
+                "StompConnector::send - "
+                "Message is not a valid stomp type.");
+        }
+
+        if( session->getAckMode() == cms::Session::Transactional )
+        {
+            StompCommand* stompCommand = 
+                dynamic_cast< StompCommand* >( message );
+
+            if( stompCommand == NULL )
+            {
+                throw StompConnectorException(
+                    __FILE__, __LINE__,
+                    "StompConnector::send - "
+                    "Message is not a valid stomp type.");
+            }
+    
+            stompCommand->setTransactionId(
+                Integer::toString( 
+                    session->getTransactionInfo()->getTransactionId() ) );
+        }
+        
+        // Send it
+        transport->oneway( command );
+    }
+    AMQ_CATCH_RETHROW( ConnectorException )
+    AMQ_CATCHALL_THROW( ConnectorException );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void StompConnector::send(std::list<cms::Message*>& messages,
+                          ProducerInfo* producerInfo) 
+    throw ( ConnectorException )
+{
+    try
+    {
+        enforceConnected();
+        
+        list<cms::Message*>::const_iterator itr = messages.begin();
+        
+        for(; itr != messages.end(); ++itr)
+        {
+            this->send(*itr, producerInfo);
+        }
+    }
+    AMQ_CATCH_RETHROW( ConnectorException )
+    AMQ_CATCHALL_THROW( ConnectorException );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void StompConnector::acknowledge( const SessionInfo* session,
+                                  const cms::Message* message,
+                                  AckType ackType = ConsumedAck )
+    throw ( ConnectorException )
+{
+    try
+    {
+        enforceConnected();
+        
+        // Auto to Stomp means don't do anything, so we drop it here
+        // for client acknowledge we have to send and ack.  
+        if( session->getAckMode() == cms::Session::ClientAcknowledge )
+        {
+            AckCommand cmd;
+
+            if( message->getCMSMessageId() == NULL )
+            {
+                throw StompConnectorException(
+                    __FILE__, __LINE__,
+                    "StompConnector::send - "
+                    "Message has no Message Id, cannot ack.");
+            }
+
+            cmd.setMessageId( message->getCMSMessageId() );
+
+            if( session->getAckMode() == cms::Session::Transactional )
+            {
+                cmd.setTransactionId( 
+                    Integer::toString( 
+                        session->getTransactionInfo()->getTransactionId() ) );
+            }
+            
+            transport->oneway( &cmd );
+        }
+    }
+    AMQ_CATCH_RETHROW( ConnectorException )
+    AMQ_CATCHALL_THROW( ConnectorException );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+TransactionInfo* StompConnector::startTransaction(
+    SessionInfo* session) 
+        throw ( ConnectorException )
+{
+    try
+    {
+        enforceConnected();
+        
+        TransactionInfo* transaction = new StompTransactionInfo();
+        
+        transaction->setTransactionId( getNextTransactionId() );
+        
+        session->setTransactionInfo( transaction );
+
+        BeginCommand cmd;
+
+        cmd.setTransactionId( 
+                Integer::toString( transaction->getTransactionId() ) );
+        
+        transport->oneway( &cmd );
+        
+        return transaction;
+    }
+    AMQ_CATCH_RETHROW( ConnectorException )
+    AMQ_CATCHALL_THROW( ConnectorException );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void StompConnector::commit(TransactionInfo* transaction, 
+                            SessionInfo* session)
+    throw ( ConnectorException )
+{
+    try
+    {
+        enforceConnected();
+        
+        CommitCommand cmd;
+        
+        cmd.setTransactionId( 
+                Integer::toString( transaction->getTransactionId() ) );
+        
+        transport->oneway( &cmd );
+    }
+    AMQ_CATCH_RETHROW( ConnectorException )
+    AMQ_CATCHALL_THROW( ConnectorException );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void StompConnector::rollback(TransactionInfo* transaction, 
+                              SessionInfo* session)
+    throw ( ConnectorException )
+{
+    try
+    {
+        enforceConnected();
+        
+        AbortCommand cmd;
+        
+        cmd.setTransactionId( 
+                Integer::toString( transaction->getTransactionId() ) );
+        
+        transport->oneway( &cmd );
+    }
+    AMQ_CATCH_RETHROW( ConnectorException )
+    AMQ_CATCHALL_THROW( ConnectorException );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+cms::Message* StompConnector::createMessage(
+    SessionInfo* session,
+    TransactionInfo* transaction)
+        throw ( ConnectorException )
+{
+    try
+    {
+        enforceConnected();
+        
+        MessageCommand* cmd = new MessageCommand();
+        
+        if( transaction != NULL )
+        {
+            cmd->setTransactionId( 
+                Integer::toString( transaction->getTransactionId() ) );
+        }
+        
+        return cmd;
+    }
+    AMQ_CATCH_RETHROW( ConnectorException )
+    AMQ_CATCHALL_THROW( ConnectorException );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+cms::BytesMessage* StompConnector::createBytesMessage(
+    SessionInfo* session,
+    TransactionInfo* transaction)
+        throw ( ConnectorException )
+{
+    try
+    {
+        enforceConnected();
+        
+        BytesMessageCommand* cmd = new BytesMessageCommand();
+        
+        if( transaction != NULL )
+        {
+            cmd->setTransactionId( 
+                Integer::toString( transaction->getTransactionId() ) );
+        }
+        
+        return cmd;
+    }
+    AMQ_CATCH_RETHROW( ConnectorException )
+    AMQ_CATCHALL_THROW( ConnectorException );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+cms::TextMessage* StompConnector::createTextMessage(
+    SessionInfo* session,
+    TransactionInfo* transaction)
+        throw ( ConnectorException )
+{
+    try
+    {
+        enforceConnected();
+        
+        TextMessageCommand* cmd = new TextMessageCommand;
+        
+        if( transaction != NULL )
+        {
+            cmd->setTransactionId( 
+                Integer::toString( transaction->getTransactionId() ) );
+        }
+        
+        return cmd;
+    }
+    AMQ_CATCH_RETHROW( ConnectorException )
+    AMQ_CATCHALL_THROW( ConnectorException );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+cms::MapMessage* StompConnector::createMapMessage(
+    SessionInfo* session,
+    TransactionInfo* transaction)
+        throw ( ConnectorException )
+{
+    try
+    {
+        throw UnsupportedOperationException(
+            __FILE__, __LINE__, 
+            "StompConnector::createTemporaryQueue - No Stomp Support");
+    }
+    AMQ_CATCH_RETHROW( ConnectorException )
+    AMQ_CATCHALL_THROW( ConnectorException );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void StompConnector::unsubscribe(const std::string& name)
+    throw ( ConnectorException )
+{
+    try
+    {
+        throw UnsupportedOperationException(
+            __FILE__, __LINE__, 
+            "StompConnector::createTemporaryQueue - No Stomp Support");
+    }
+    AMQ_CATCH_RETHROW( ConnectorException )
+    AMQ_CATCHALL_THROW( ConnectorException );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void StompConnector::destroyResource( ConnectorResource* resource )
+    throw ( ConnectorException )
+{
+    try
+    {
+        ConsumerInfo* consumer = 
+            dynamic_cast<ConsumerInfo*>(resource);
+        SessionInfo* session = 
+            dynamic_cast<SessionInfo*>(resource);
+
+        if( consumer != NULL)
+        {
+            sessionManager->removeConsumer( consumer );
+        }
+        else if( session != NULL)
+        {
+            sessionManager->removeSession( session );
+        }
+
+        // No matter what we end it here.
+        delete resource;
+    }
+    AMQ_CATCH_RETHROW( ConnectorException )
+    AMQ_CATCHALL_THROW( ConnectorException );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void StompConnector::onCommand( transport::Command* command )
+{
+    try
+    {
+        StompCommand* stompCommand = dynamic_cast< StompCommand* >(command);
+
+        if(stompCommand == NULL)
+        {
+            fire( ConnectorException(
+                __FILE__, __LINE__,
+                "StompConnector::onCommand - Recieved an unknown Command") );
+        }
+
+        CmdListenerMap::iterator itr = 
+            cmdListenerMap.find( stompCommand->getStompCommandId() );
+            
+        if( itr == cmdListenerMap.end() )
+        {
+            fire( ConnectorException(
+                __FILE__, __LINE__,
+                "StompConnector::onCommand - "
+                "Recieved command with no listener") );
+
+            // This isn't going an farther, so delete it.
+            delete command;
+
+            return;   // we are done
+        }
+        
+        // Hand off
+        itr->second->onStompCommand( stompCommand );         
+    }
+    AMQ_CATCH_RETHROW( ConnectorException )
+    AMQ_CATCHALL_THROW( ConnectorException );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void StompConnector::onTransportException( 
+    transport::Transport* source, 
+    const exceptions::ActiveMQException& ex )
+{
+    try
+    {
+        // Inform the user.
+        fire( ex );
+        
+        // Close down.
+        close();
+    }
+    AMQ_CATCH_RETHROW( ConnectorException )
+    AMQ_CATCHALL_THROW( ConnectorException );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void StompConnector::onStompCommand( commands::StompCommand* command ) 
+    throw ( StompConnectorException )
+{
+    try
+    {        
+        ErrorCommand* error = 
+            dynamic_cast<ErrorCommand*>(command);
+        
+        if(error != NULL)
+        {
+            fire( StompConnectorException(
+                __FILE__, __LINE__,
+                (string( "StompConnector::onStompCommand - " ) + 
+                error->getErrorMessage() ).c_str() ) );
+                
+            // Shutdown
+            close();
+        }
+    }
+    AMQ_CATCH_RETHROW( StompConnectorException )
+    AMQ_CATCHALL_THROW( StompConnectorException );
+}

Added: incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompConnector.h
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompConnector.h?rev=418749&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompConnector.h (added)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompConnector.h Mon Jul  3 04:51:36 2006
@@ -0,0 +1,533 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+#ifndef ACTIVEMQ_CONNECTOR_STOMP_STOMPCONNECTOR_H_
+#define ACTIVEMQ_CONNECTOR_STOMP_STOMPCONNECTOR_H_
+
+#include <activemq/connector/Connector.h>
+#include <activemq/transport/Transport.h>
+#include <activemq/transport/CommandListener.h>
+#include <activemq/transport/TransportExceptionListener.h>
+#include <activemq/concurrent/Mutex.h>
+#include <activemq/util/Properties.h>
+#include <activemq/connector/stomp/StompCommandReader.h>
+#include <activemq/connector/stomp/StompCommandWriter.h>
+#include <activemq/connector/stomp/StompCommandListener.h>
+#include <activemq/connector/stomp/StompSessionManager.h>
+#include <activemq/connector/stomp/commands/CommandConstants.h>
+#include <activemq/exceptions/IllegalArgumentException.h>
+
+namespace activemq{
+namespace connector{
+namespace stomp{
+   
+    /**
+     * The connector implementation for the STOMP protocol.
+     */
+    class StompConnector
+    :
+        public Connector,
+        public transport::CommandListener,
+        public transport::TransportExceptionListener,
+        public StompCommandListener
+    {
+    private:
+    
+        // Flags the state we are in for connection to broker.
+        enum connectionState
+        {
+            DISCONNECTED,
+            CONNECTING,
+            CONNECTED
+        };
+
+        // Maps Command Ids to listener that are interested        
+        typedef std::map< commands::CommandConstants::CommandId, 
+                          StompCommandListener*> CmdListenerMap;
+        
+    private:
+    
+        /**
+         * The transport for sending/receiving commands on the wire.
+         */
+        transport::Transport* transport;
+        
+        /**
+         * Flag to indicate the start state of the connector.
+         */
+        connectionState state;
+        
+        /**
+         * Sync object.
+         */
+        concurrent::Mutex mutex;
+        
+        /**
+         * Observer of messages directed at a particular
+         * consumer.
+         */
+        ConsumerMessageListener* messageListener;
+        
+        /**
+         * Observer of connector exceptions.
+         */
+        cms::ExceptionListener* exceptionListener;
+        
+        /**
+         * This Connector's Command Reader
+         */
+        StompCommandReader reader;
+        
+        /**
+         * This Connector's Command Writer
+         */
+        StompCommandWriter writer;
+        
+        /**
+         * Map to hold StompCommandListeners
+         */
+        CmdListenerMap cmdListenerMap;
+        
+        /**
+         * Session Manager object that will  be allocated when we connect
+         */
+        StompSessionManager* sessionManager;
+        
+        /**
+         * Next avaliable Producer Id
+         */
+        unsigned int nextProducerId;
+        
+        /**
+         * Next avaliable Transaction Id
+         */
+        unsigned int nextTransactionId;
+        
+        /**
+         * Properties for the connector.
+         */
+        util::SimpleProperties properties;
+
+    private:
+    
+        /**
+         * Sends the connect message to the broker and
+         * waits for the response.
+         */
+        void connect(void);
+        
+        /**
+         * Sends a oneway disconnect message to the broker.
+         */
+        void disconnect(void);
+        
+        /**
+         * Fires a consumer message to the observer.
+         */
+        void fire( ConsumerInfo* consumer, core::ActiveMQMessage* msg ){
+            try{
+                if( messageListener != NULL ){
+                    messageListener->onConsumerMessage( 
+                        consumer,
+                        msg );
+                }
+            }catch( ... ){/* do nothing*/}
+        }
+        
+        /**
+         * Fires an exception event to the observing object.
+         */
+        void fire( const exceptions::ActiveMQException& ex ){
+            try{
+                if( exceptionListener != NULL ){
+                    exceptionListener->onException( ex );
+                }
+            }catch( ... ){/* do nothing*/}
+        }
+        
+    public:
+    
+        /**
+         * Constructor for the stomp connector.
+         * @param transport the transport object for sending/receiving
+         * commands on the wire.
+         * @param props properties for configuring the connector.
+         */
+        StompConnector( transport::Transport* transport, 
+                        const util::Properties& properties )
+            throw ( exceptions::IllegalArgumentException );
+
+        virtual ~StompConnector(void);
+        
+        /**
+         * Starts the service.
+         * @throws CMSException
+         */
+        virtual void start(void) throw( cms::CMSException );
+        
+        /**
+         * Closes this object and deallocates the appropriate resources.
+         * @throws CMSException
+         */
+        virtual void close(void) throw( cms::CMSException );
+
+        /**
+         * Gets the Client Id for this connection, if this
+         * connection has been closed, then this method returns ""
+         * @return Client Id String
+         */
+        virtual std::string getClientId(void) const {
+            return properties.getProperty( 
+                commands::CommandConstants::toString( 
+                    commands::CommandConstants::HEADER_CLIENT_ID ), "" );
+        }
+        
+        virtual std::string getLogin(void) const {
+            return properties.getProperty( 
+                commands::CommandConstants::toString( 
+                    commands::CommandConstants::HEADER_LOGIN ), "" );
+        }
+        
+        virtual std::string getPassword(void) const {
+            return properties.getProperty( 
+                commands::CommandConstants::toString( 
+                    commands::CommandConstants::HEADER_PASSWORD ), "" );
+        }
+
+        /**
+         * Gets a reference to the Transport that this connection
+         * is using.
+         * @param reference to a transport
+         * @throws InvalidStateException if the Transport is not set
+         */
+        virtual transport::Transport& getTransport(void) const 
+            throw (exceptions::InvalidStateException ) {
+
+            if( transport == NULL ) {
+                throw exceptions::InvalidStateException(
+                    __FILE__, __LINE__,
+                    "StompConnector::getTransport - "
+                    "Invalid State, No Transport.");
+            }
+            
+            return *transport;
+        }
+
+        /**
+         * Creates a Session Info object for this connector
+         * @param Acknowledgement Mode of the Session
+         * @returns Session Info Object
+         * @throws ConnectorException
+         */
+        virtual SessionInfo* createSession(
+            cms::Session::AcknowledgeMode ackMode) 
+                throw( ConnectorException );
+      
+        /** 
+         * Create a Consumer for the given Session
+         * @param Destination to Subscribe to.
+         * @param Session Information.
+         * @return Consumer Information
+         * @throws ConnectorException
+         */
+        virtual ConsumerInfo* createConsumer(
+            cms::Destination* destination, 
+            SessionInfo* session,
+            const std::string& selector = "")
+                throw ( ConnectorException );
+         
+        /** 
+         * Create a Durable Consumer for the given Session
+         * @param Topic to Subscribe to.
+         * @param Session Information.
+         * @param name of the Durable Topic
+         * @param Selector
+         * @param if set, inhibits the delivery of messages 
+         *        published by its own connection 
+         * @return Consumer Information
+         * @throws ConnectorException
+         */
+        virtual ConsumerInfo* createDurableConsumer(
+            cms::Topic* topic, 
+            SessionInfo* session,
+            const std::string& name,
+            const std::string& selector = "",
+            bool noLocal = false)
+                throw ( ConnectorException );
+
+        /** 
+         * Create a Consumer for the given Session
+         * @param Destination to Subscribe to.
+         * @param Session Information.
+         * @return Producer Information
+         * @throws ConnectorException
+         */
+        virtual ProducerInfo* createProducer(
+            cms::Destination* destination, 
+            SessionInfo* session)
+                throw ( ConnectorException );
+
+        /**
+         * Creates a Topic given a name and session info
+         * @param Topic Name
+         * @param Session Information
+         * @return a newly created Topic Object
+         * @throws ConnectorException
+         */
+        virtual cms::Topic* createTopic( const std::string& name, 
+                                         SessionInfo* session )
+            throw ( ConnectorException );
+          
+        /**
+         * Creates a Queue given a name and session info
+         * @param Queue Name
+         * @param Session Information
+         * @return a newly created Queue Object
+         * @throws ConnectorException
+         */
+        virtual cms::Queue* createQueue( const std::string& name, 
+                                         SessionInfo* session )
+            throw ( ConnectorException );
+
+        /**
+         * Creates a Temporary Topic given a name and session info
+         * @param Temporary Topic Name
+         * @param Session Information
+         * @return a newly created Temporary Topic Object
+         * @throws ConnectorException
+         */
+        virtual cms::TemporaryTopic* createTemporaryTopic(
+            SessionInfo* session)
+                throw ( ConnectorException );
+          
+        /**
+         * Creates a Temporary Queue given a name and session info
+         * @param Temporary Queue Name
+         * @param Session Information
+         * @return a newly created Temporary Queue Object
+         * @throws ConnectorException
+         */
+        virtual cms::TemporaryQueue* createTemporaryQueue(
+            SessionInfo* session)
+                throw ( ConnectorException );
+
+        /**
+         * Sends a Message
+         * @param The Message to send.
+         * @param Producer Info for the sender of this message
+         * @throws ConnectorException
+         */
+        virtual void send( cms::Message* message, ProducerInfo* producerInfo ) 
+            throw ( ConnectorException );
+      
+        /**
+         * Sends a set of Messages
+         * @param List of Messages to send.
+         * @param Producer Info for the sender of this message
+         * @throws ConnectorException
+         */
+        virtual void send( std::list<cms::Message*>& messages,
+                           ProducerInfo* producerInfo ) 
+            throw ( ConnectorException );
+         
+        /**
+         * Acknowledges a Message
+         * @param An ActiveMQMessage to Ack.
+         * @throws ConnectorException
+         */
+        virtual void acknowledge( const SessionInfo* session,
+                                  const cms::Message* message,
+                                  AckType ackType)
+            throw ( ConnectorException );
+
+        /**
+         * Starts a new Transaction.
+         * @param Session Information
+         * @throws ConnectorException
+         */
+        virtual TransactionInfo* startTransaction(
+            SessionInfo* session) 
+                throw ( ConnectorException );
+         
+        /**
+         * Commits a Transaction.
+         * @param The Transaction information
+         * @param Session Information
+         * @throws ConnectorException
+         */
+        virtual void commit(TransactionInfo* transaction, 
+                            SessionInfo* session)
+            throw ( ConnectorException );
+
+        /**
+         * Rolls back a Transaction.
+         * @param The Transaction information
+         * @param Session Information
+         * @throws ConnectorException
+         */
+        virtual void rollback(TransactionInfo* transaction, 
+                              SessionInfo* session)
+            throw ( ConnectorException );
+
+        /**
+         * Creates a new Message.
+         * @param Session Information
+         * @param Transaction Info for this Message
+         * @throws ConnectorException
+         */
+        virtual cms::Message* createMessage(
+            SessionInfo* session,
+            TransactionInfo* transaction)
+                throw ( ConnectorException );
+
+        /**
+         * Creates a new BytesMessage.
+         * @param Session Information
+         * @param Transaction Info for this Message
+         * @throws ConnectorException
+         */
+        virtual cms::BytesMessage* createBytesMessage(
+            SessionInfo* session,
+            TransactionInfo* transaction)
+                throw ( ConnectorException );
+
+        /**
+         * Creates a new TextMessage.
+         * @param Session Information
+         * @param Transaction Info for this Message
+         * @throws ConnectorException
+         */
+        virtual cms::TextMessage* createTextMessage(
+            SessionInfo* session,
+            TransactionInfo* transaction)
+                throw ( ConnectorException );
+
+        /**
+         * Creates a new MapMessage.
+         * @param Session Information
+         * @param Transaction Info for this Message
+         * @throws ConnectorException
+         */
+        virtual cms::MapMessage* createMapMessage(
+            SessionInfo* session,
+            TransactionInfo* transaction)
+                throw ( ConnectorException );
+
+        /** 
+         * Unsubscribe from a givenDurable Subscription
+         * @param name of the Subscription
+         * @throws ConnectorException
+         */
+        virtual void unsubscribe( const std::string& name )
+            throw ( ConnectorException );
+
+        /**
+         * Destroys the given connector resource.
+         * @param resource the resource to be destroyed.
+         * @throws ConnectorException
+         */
+        virtual void destroyResource( ConnectorResource* resource )
+            throw ( ConnectorException );
+            
+        /** 
+         * Sets the listener of consumer messages.
+         * @param listener the observer.
+         */
+        virtual void setConsumerMessageListener(
+            ConsumerMessageListener* listener)
+        {
+            this->messageListener = listener;
+            
+            if(sessionManager != NULL)
+            {
+                sessionManager->setConsumerMessageListener( listener );
+            }
+        }
+
+        /** 
+         * Sets the Listner of exceptions for this connector
+         * @param ExceptionListener the observer.
+         */
+        virtual void setExceptionListener(
+            cms::ExceptionListener* listener)
+        {
+            this->exceptionListener = listener;
+        }
+        
+    public: // transport::CommandListener
+    
+        /**
+         * Event handler for the receipt of a non-response command from the 
+         * transport.
+         * @param command the received command object.
+         */
+        virtual void onCommand( transport::Command* command );
+        
+    public: // TransportExceptionListener
+
+        /**
+         * Event handler for an exception from a command transport.
+         * @param source The source of the exception
+         * @param ex The exception.
+         */
+        virtual void onTransportException( 
+            transport::Transport* source, 
+            const exceptions::ActiveMQException& ex );
+
+    public: // StompCommandListener
+
+        /**
+         * Process the Stomp Command
+         * @param command to process
+         * @throw ConnterException
+         */
+        virtual void onStompCommand( commands::StompCommand* command ) 
+            throw ( StompConnectorException );    
+
+    public:
+    
+        /**
+         * Registers a Command Listener using the CommandId specified
+         * if there is already a listener for that command it will be
+         * removed.
+         * @param CommandId to process
+         * @param pointer to the listener to call
+         */
+        virtual void addCmdListener( 
+            commands::CommandConstants::CommandId commandId,
+            StompCommandListener* listener );
+        
+        /**
+         * UnRegisters a Command Listener using the CommandId specified
+         * @param CommandId of the listener to remove.
+         */
+        virtual void removeCmdListener( 
+            commands::CommandConstants::CommandId commandId );
+        
+    private:
+    
+        unsigned int getNextProducerId( void );
+        unsigned int getNextTransactionId( void );
+
+        // Check for Connected State and Throw an exception if not.
+        void enforceConnected( void ) throw ( ConnectorException );
+        
+    };
+
+}}}
+
+#endif /*ACTIVEMQ_CONNECTOR_STOMP_STOMPCONNECTOR_H_*/

Added: incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompConnectorException.h
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompConnectorException.h?rev=418749&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompConnectorException.h (added)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompConnectorException.h Mon Jul  3 04:51:36 2006
@@ -0,0 +1,65 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef ACTIVEMQ_CONNECTOR_STOMP_STOMPCONNECTOREXCEPTION_H_
+#define ACTIVEMQ_CONNECTOR_STOMP_STOMPCONNECTOREXCEPTION_H_
+
+#include <activemq/connector/ConnectorException.h>
+
+namespace activemq{
+namespace connector{
+namespace stomp{
+
+    /*
+     * Signals that an Connector exception of some sort has occurred.
+     */
+    class StompConnectorException : public connector::ConnectorException
+    {
+    public:
+   
+      StompConnectorException() {}
+      StompConnectorException( const exceptions::ActiveMQException& ex ){
+        *(ActiveMQException*)this = ex;
+      }
+      StompConnectorException( const StompConnectorException& ex ){
+        *(exceptions::ActiveMQException*)this = ex;
+      }
+      StompConnectorException(const char* file, const int lineNumber, 
+        const char* msg, ...)
+      {
+          va_list vargs ;
+          va_start(vargs, msg) ;
+          buildMessage(msg, vargs) ;
+            
+          // Set the first mark for this exception.
+          setMark( file, lineNumber );
+      }
+      
+      /**
+       * Clones this exception.  This is useful for cases where you need
+       * to preserve the type of the original exception as well as the message.
+       * All subclasses should override.
+       */
+      virtual exceptions::ActiveMQException* clone() const{
+          return new StompConnectorException( *this );
+      }
+      virtual ~StompConnectorException() {}
+
+    };
+
+}}}
+
+#endif /*ACTIVEMQ_CONNECTOR_STOMP_STOMPCONNECTOREXCEPTION_H_*/

Added: incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompConnectorFactory.cpp
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompConnectorFactory.cpp?rev=418749&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompConnectorFactory.cpp (added)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompConnectorFactory.cpp Mon Jul  3 04:51:36 2006
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+ 
+#include <activemq/connector/stomp/StompConnectorFactory.h>
+#include <activemq/connector/stomp/StompConnector.h>
+#include <activemq/connector/Connector.h>
+#include <activemq/transport/Transport.h>
+
+using namespace activemq;
+using namespace activemq::util;
+using namespace activemq::transport;
+using namespace activemq::connector;
+using namespace activemq::connector::stomp;
+
+////////////////////////////////////////////////////////////////////////////////
+
+////////////////////////////////////////////////////////////////////////////////
+Connector* StompConnectorFactory::createConnector(
+    const activemq::util::Properties& properties,
+    activemq::transport::Transport* transport)
+{
+    return dynamic_cast<Connector*>(
+        new StompConnector(transport, properties));
+}
+
+////////////////////////////////////////////////////////////////////////////////
+ConnectorFactory& StompConnectorFactory::getInstance(void)
+{
+    // Create a static instance of the registrar and return a reference to
+    // its internal instance of this class.
+    static ConnectorFactoryMapRegistrar registrar(
+        "stomp", new StompConnectorFactory());
+
+    return registrar.getFactory();
+}

Added: incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompConnectorFactory.h
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompConnectorFactory.h?rev=418749&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompConnectorFactory.h (added)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompConnectorFactory.h Mon Jul  3 04:51:36 2006
@@ -0,0 +1,54 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef ACTIVEMQ_CONNECTOR_STOMP_STOMPCONNECTORFACTORY_H_
+#define ACTIVEMQ_CONNECTOR_STOMP_STOMPCONNECTORFACTORY_H_
+
+#include <activemq/connector/ConnectorFactory.h>
+#include <activemq/connector/ConnectorFactoryMapRegistrar.h>
+
+namespace activemq{
+namespace connector{
+namespace stomp{
+
+    class StompConnectorFactory : public connector::ConnectorFactory
+    {
+    private:
+
+
+    public:
+   
+        virtual ~StompConnectorFactory(void) {}
+   
+        /** 
+         * Creates a StompConnector
+         * @param The Properties that the new connector is configured with
+         */
+        virtual Connector* createConnector(
+            const activemq::util::Properties& properties,
+            activemq::transport::Transport* transport);
+
+        /**
+         * Returns an instance of this Factory by reference
+         * @return StompConnectorFactory reference
+         */
+        static ConnectorFactory& getInstance(void);
+
+    };
+
+}}}
+
+#endif /*ACTIVEMQ_CONNECTOR_STOMP_STOMPCONNECTORFACTORY_H_*/

Added: incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompConsumerInfo.h
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompConsumerInfo.h?rev=418749&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompConsumerInfo.h (added)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompConsumerInfo.h Mon Jul  3 04:51:36 2006
@@ -0,0 +1,118 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _ACTIVEMQ_CONNECTOR_STOMP_STOMPCONSUMERINFO_H_
+#define _ACTIVEMQ_CONNECTOR_STOMP_STOMPCONSUMERINFO_H_
+
+namespace activemq{
+namespace connector{
+namespace stomp{
+
+    class StompConsumerInfo : public ConsumerInfo
+    {
+    private:
+
+        // Message Selector for this Consumer
+        std::string selector;
+        
+        // Consumer Id
+        unsigned int consumerId;
+        
+        // Destination
+        cms::Destination* destination;
+        
+        // Session Info - We do not own this
+        const SessionInfo* session;
+    
+    public:
+
+    	StompConsumerInfo(void) {
+            selector = "";
+            consumerId = 0;
+            destination = NULL;
+        }
+    	virtual ~StompConsumerInfo(void) { delete destination; }
+
+        /**
+         * Gets this message consumer's message selector expression.
+         * @return This Consumer's selector expression or "".
+         */
+        virtual const std::string& getMessageSelector(void) const {
+            return selector;
+        }
+        
+        /**
+         * Sets this message consumer's message selector expression.
+         * @param This Consumer's selector expression or "".
+         */
+        virtual void setMessageSelector( const std::string& selector ) {
+            this->selector = selector;
+        }
+
+        /**
+         * Gets the ID that is assigned to this consumer
+         * @return value of the Consumer Id.
+         */
+        virtual unsigned int getConsumerId(void) const {
+            return consumerId;
+        }
+        
+        /**
+         * Sets the ID that is assigned to this consumer
+         * @return string value of the Consumer Id.
+         */
+        virtual void setConsumerId( const unsigned int id ) {
+            this->consumerId = id;
+        }
+        
+        /**
+         * Gets the Destination that this Consumer is subscribed on
+         * @return Destination
+         */
+        virtual const cms::Destination& getDestination(void) const {
+            return *destination;
+        }
+        
+        /**
+         * Sets the destination that this Consumer is listening on
+         * @param Destination
+         */
+        virtual void setDestination( const cms::Destination& destination ) {
+            this->destination = destination.clone();
+        }
+        
+        /**
+         * Gets the Session Info that this consumer is attached too
+         * @return SessionnInfo pointer
+         */
+        virtual const SessionInfo* getSessionInfo(void) const {
+            return session;
+        }
+        
+        /**
+         * Gets the Session Info that this consumer is attached too
+         * @return SessionnInfo pointer
+         */
+        virtual void setSessionInfo( const SessionInfo* session ) {
+            this->session = session;
+        }
+
+    };
+
+}}}
+
+#endif /*_ACTIVEMQ_CONNECTOR_STOMP_STOMPCONSUMERINFO_H_*/

Added: incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompDestination.h
URL: http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompDestination.h?rev=418749&view=auto
==============================================================================
--- incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompDestination.h (added)
+++ incubator/activemq/trunk/activemq-cpp/src/main/activemq/connector/stomp/StompDestination.h Mon Jul  3 04:51:36 2006
@@ -0,0 +1,106 @@
+/*
+ * Copyright 2006 The Apache Software Foundation or its licensors, as
+ * applicable.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _ACTIVEMQ_CONNECTOR_STOMP_STOMPDESTINATION_H_
+#define _ACTIVEMQ_CONNECTOR_STOMP_STOMPDESTINATION_H_
+
+#include <string>
+
+#include <cms/Destination.h>
+
+namespace activemq{
+namespace connector{
+namespace stomp{
+
+    /**
+     * Templatized Destination Class that bundles all the common aspects
+     * of a Stomp Destination into one class.  The template arguement is 
+     * one of Topic, Queue, TemporaryTopic, or TemporaryQueue.
+     */
+    template <typename T>
+    class StompDestination : public T
+    {
+    private:
+    
+        // Destination type
+        cms::Destination::DestinationType destType;
+        
+        // Name of the Destination
+        std::string name;
+        
+    public:
+
+    	StompDestination(void) {}
+        
+    	StompDestination( const std::string& name,
+                          cms::Destination::DestinationType type )
+        {
+            this->name = name;
+            this->destType = type;
+        }
+
+        virtual ~StompDestination(void) {}
+
+        /**
+         * Retrieves the name of this destination, plus the stomp
+         * destination decorator
+         * @return name
+         */
+        virtual std::string toProviderString(void) const {
+            return getPrefix() + name;
+        }
+        
+        /**
+         * Retrieve the Destination Type for this Destination
+         * @return The Destination Type
+         */
+        virtual cms::Destination::DestinationType getDestinationType(void) const {
+            return destType;
+        }
+        
+        /**
+         * Converts the Destination Name into a String minus the 
+         * stomp decorator
+         * @return string name
+         */
+        virtual std::string toString(void) const {
+            return name;
+        }
+
+        /**
+         * Copies the contents of the given Destinastion object to this one.
+         * @param source The source Destination object.
+         */
+        virtual void copy( const cms::Destination& source ) {
+            this->destType = source.getDestinationType();
+            this->name = source.toString();
+        }
+
+    protected:
+    
+        /**
+         * Retrieves the proper Stomp Prefix for the specified type
+         * of Destination
+         * @return string prefix
+         */
+        virtual std::string getPrefix(void) const = 0;
+
+    };
+
+}}}
+
+#endif /*_ACTIVEMQ_CONNECTOR_STOMP_STOMPDESTINATION_H_*/



Mime
View raw message