activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r733509 [2/33] - in /activemq/activemq-cpp/trunk/src/main/activemq/wireformat/openwire: ./ marshal/ marshal/v1/ marshal/v2/ marshal/v3/ utils/
Date Sun, 11 Jan 2009 20:22:43 GMT
Added: activemq/activemq-cpp/trunk/src/main/activemq/wireformat/openwire/OpenWireFormatFactory.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/wireformat/openwire/OpenWireFormatFactory.cpp?rev=733509&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/wireformat/openwire/OpenWireFormatFactory.cpp (added)
+++ activemq/activemq-cpp/trunk/src/main/activemq/wireformat/openwire/OpenWireFormatFactory.cpp Sun Jan 11 12:22:34 2009
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <activemq/wireformat/openwire/OpenWireFormatFactory.h>
+#include <activemq/wireformat/openwire/OpenWireFormat.h>
+
+#include <decaf/lang/Boolean.h>
+#include <decaf/lang/Integer.h>
+
+using namespace std;
+using namespace activemq;
+using namespace activemq::util;
+using namespace activemq::commands;
+using namespace activemq::transport;
+using namespace activemq::exceptions;
+using namespace activemq::wireformat;
+using namespace activemq::wireformat::openwire;
+using namespace decaf::lang;
+using namespace decaf::lang::exceptions;
+
+////////////////////////////////////////////////////////////////////////////////
+WireFormat* OpenWireFormatFactory::createWireFormat(
+    const decaf::util::Properties& properties )
+        throw ( decaf::lang::exceptions::IllegalStateException ) {
+
+    try{
+
+        WireFormatInfo* info = new WireFormatInfo();
+
+        // Configure the version to use
+        info->setVersion( Integer::parseInt(
+                properties.getProperty( "wireFormat.version", "3" ) ) );
+
+        // parse params out of the properties
+        info->setStackTraceEnabled( Boolean::parseBoolean(
+            properties.getProperty( "wireFormat.stackTraceEnabled",
+                                    "false" ) ) );
+        info->setCacheEnabled( Boolean::parseBoolean(
+            properties.getProperty( "wireFormat.cacheEnabled",
+                                    "false" ) ) );
+        info->setTcpNoDelayEnabled( Boolean::parseBoolean(
+            properties.getProperty( "wireFormat.tcpNoDelayEnabled",
+                                    "false" ) ) );
+        info->setTightEncodingEnabled( Boolean::parseBoolean(
+            properties.getProperty( "wireFormat.tightEncodingEnabled",
+                                    "false" ) ) );
+        info->setSizePrefixDisabled( Boolean::parseBoolean(
+            properties.getProperty( "wireFormat.sizePrefixDisabled",
+                                    "false" ) ) );
+
+        // Create the Openwire Format Object
+        OpenWireFormat* f = new OpenWireFormat( properties );
+
+        // give the format object the ownership
+        f->setPreferedWireFormatInfo( info );
+
+        return f;
+    }
+    AMQ_CATCH_RETHROW( IllegalStateException )
+    AMQ_CATCHALL_THROW( IllegalStateException )
+}

Propchange: activemq/activemq-cpp/trunk/src/main/activemq/wireformat/openwire/OpenWireFormatFactory.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-cpp/trunk/src/main/activemq/wireformat/openwire/OpenWireFormatFactory.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/wireformat/openwire/OpenWireFormatFactory.h?rev=733509&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/wireformat/openwire/OpenWireFormatFactory.h (added)
+++ activemq/activemq-cpp/trunk/src/main/activemq/wireformat/openwire/OpenWireFormatFactory.h Sun Jan 11 12:22:34 2009
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _ACTIVEMQ_WIREFORMAT_OPENWIRE_OPENWIREFORMATFACTORY_H_
+#define _ACTIVEMQ_WIREFORMAT_OPENWIRE_OPENWIREFORMATFACTORY_H_
+
+#include <activemq/util/Config.h>
+#include <activemq/wireformat/WireFormatFactory.h>
+#include <activemq/commands/WireFormatInfo.h>
+#include <decaf/lang/exceptions/IllegalStateException.h>
+#include <decaf/util/Properties.h>
+
+namespace activemq{
+namespace wireformat{
+namespace openwire{
+
+    class AMQCPP_API OpenWireFormatFactory : public wireformat::WireFormatFactory {
+    public:
+
+        /**
+         * Constructor - Sets Defaults for all properties, these are all
+         * subject to change once the <code>createWireFormat</code> method
+         * is called.
+         *
+         * URL options
+         * --------------------
+         * wireFormat.stackTraceEnabled
+         * wireFormat.cacheEnabled
+         * wireFormat.tcpNoDelayEnabled
+         * wireFormat.tightEncodingEnabled
+         * wireFormat.sizePrefixDisabled
+         * wireFormat.maxInactivityDuration
+         */
+        OpenWireFormatFactory() {}
+
+        virtual ~OpenWireFormatFactory() {}
+
+        /**
+         * Creates a new WireFormat Object passing it a set of
+         * properties from which it can obtain any optional settings
+         * @param properties - the Properties for this WireFormat
+         */
+        virtual wireformat::WireFormat* createWireFormat(
+            const decaf::util::Properties& properties )
+                throw ( decaf::lang::exceptions::IllegalStateException );
+
+    };
+
+}}}
+
+#endif /*_ACTIVEMQ_WIREFORMAT_OPENWIRE_OPENWIREFORMATFACTORY_H_*/

Propchange: activemq/activemq-cpp/trunk/src/main/activemq/wireformat/openwire/OpenWireFormatFactory.h
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-cpp/trunk/src/main/activemq/wireformat/openwire/OpenWireFormatNegotiator.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/wireformat/openwire/OpenWireFormatNegotiator.cpp?rev=733509&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/wireformat/openwire/OpenWireFormatNegotiator.cpp (added)
+++ activemq/activemq-cpp/trunk/src/main/activemq/wireformat/openwire/OpenWireFormatNegotiator.cpp Sun Jan 11 12:22:34 2009
@@ -0,0 +1,270 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "OpenWireFormatNegotiator.h"
+
+#include <activemq/commands/DataStructure.h>
+#include <activemq/commands/WireFormatInfo.h>
+#include <activemq/transport/IOTransport.h>
+
+using namespace std;
+using namespace activemq;
+using namespace activemq::exceptions;
+using namespace activemq::wireformat;
+using namespace activemq::wireformat::openwire;
+using namespace activemq::transport;
+using namespace activemq::commands;
+using namespace decaf::util::concurrent;
+using namespace decaf::lang::exceptions;
+
+////////////////////////////////////////////////////////////////////////////////
+OpenWireFormatNegotiator::OpenWireFormatNegotiator( OpenWireFormat* openWireFormat,
+                                                    Transport* next,
+                                                    bool own ) :
+    WireFormatNegotiator( next, own ),
+    wireInfoSentDownLatch(1),
+    readyCountDownLatch(1)
+{
+    this->firstTime.set( true );
+    this->openWireFormat = openWireFormat;
+    this->closed = true;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+OpenWireFormatNegotiator::~OpenWireFormatNegotiator()
+{
+    // Close the transport and destroy it.
+    close();
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void OpenWireFormatNegotiator::oneway( Command* command )
+    throw( CommandIOException, UnsupportedOperationException ) {
+
+    try{
+
+        if( closed || next == NULL ){
+            throw CommandIOException(
+                __FILE__, __LINE__,
+                "OpenWireFormatNegotiator::oneway - transport already closed" );
+        }
+
+        if( !readyCountDownLatch.await( negotiationTimeout ) ) {
+            throw CommandIOException(
+                __FILE__,
+                __LINE__,
+                "OpenWireFormatNegotiator::oneway"
+                "Wire format negotiation timeout: peer did not "
+                "send his wire format." );
+        }
+
+        next->oneway( command );
+    }
+    AMQ_CATCH_RETHROW( UnsupportedOperationException )
+    AMQ_CATCH_RETHROW( CommandIOException )
+    AMQ_CATCH_EXCEPTION_CONVERT( exceptions::ActiveMQException, CommandIOException )
+    AMQ_CATCHALL_THROW( CommandIOException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+Response* OpenWireFormatNegotiator::request( Command* command )
+    throw( CommandIOException, UnsupportedOperationException ) {
+
+    try{
+
+        if( closed || next == NULL ){
+            throw CommandIOException(
+                __FILE__, __LINE__,
+                "OpenWireFormatNegotiator::request - transport already closed" );
+        }
+
+        if( !readyCountDownLatch.await( negotiationTimeout ) ) {
+            throw CommandIOException(
+                __FILE__,
+                __LINE__,
+                "OpenWireFormatNegotiator::request"
+                "Wire format negotiation timeout: peer did not "
+                "send his wire format." );
+        }
+
+        return next->request( command );
+    }
+    AMQ_CATCH_RETHROW( UnsupportedOperationException )
+    AMQ_CATCH_RETHROW( CommandIOException )
+    AMQ_CATCH_EXCEPTION_CONVERT( exceptions::ActiveMQException, CommandIOException )
+    AMQ_CATCHALL_THROW( CommandIOException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+Response* OpenWireFormatNegotiator::request( Command* command, unsigned int timeout )
+    throw( CommandIOException, UnsupportedOperationException ) {
+
+    try{
+
+        if( closed || next == NULL ){
+            throw CommandIOException(
+                __FILE__, __LINE__,
+                "OpenWireFormatNegotiator::request - transport already closed" );
+        }
+
+        if( !readyCountDownLatch.await( negotiationTimeout ) ) {
+            throw CommandIOException(
+                __FILE__,
+                __LINE__,
+                "OpenWireFormatNegotiator::request"
+                "Wire format negotiation timeout: peer did not "
+                "send his wire format." );
+        }
+
+        return next->request( command, timeout );
+    }
+    AMQ_CATCH_RETHROW( UnsupportedOperationException )
+    AMQ_CATCH_RETHROW( CommandIOException )
+    AMQ_CATCH_EXCEPTION_CONVERT( exceptions::ActiveMQException, CommandIOException )
+    AMQ_CATCHALL_THROW( CommandIOException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void OpenWireFormatNegotiator::onCommand( Command* command ) {
+
+    DataStructure* dataStructure =
+        dynamic_cast<DataStructure*>( command );
+
+    if( dataStructure != NULL &&
+        dataStructure->getDataStructureType() == WireFormatInfo::ID_WIREFORMATINFO ) {
+
+        WireFormatInfo* info = dynamic_cast<WireFormatInfo*>( dataStructure );
+
+        try {
+
+            if( !info->isValid() ) {
+                throw CommandIOException(
+                    __FILE__,
+                    __LINE__,
+                    "OpenWireFormatNegotiator::onCommand"
+                    "Remote wire format magic is invalid" );
+            }
+
+            wireInfoSentDownLatch.await( negotiationTimeout );
+            openWireFormat->renegotiateWireFormat( info );
+
+            readyCountDownLatch.countDown();
+
+        } catch( exceptions::ActiveMQException& ex ) {
+
+            readyCountDownLatch.countDown();
+            fire( ex );
+        }
+    }
+
+    // Send along to the next interested party.
+    fire( command );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void OpenWireFormatNegotiator::onTransportException(
+    Transport* source AMQCPP_UNUSED,
+    const decaf::lang::Exception& ex ) {
+
+    readyCountDownLatch.countDown();
+    fire( ex );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void OpenWireFormatNegotiator::start() throw( cms::CMSException ){
+
+    /**
+     * We're already started.
+     */
+    if( !closed ){
+        return;
+    }
+
+    if( commandlistener == NULL ){
+        throw exceptions::ActiveMQException(
+            __FILE__, __LINE__,
+            "OpenWireFormatNegotiator::start - "
+            "commandListener is invalid" );
+    }
+
+    if( exceptionListener == NULL ){
+        throw exceptions::ActiveMQException(
+            __FILE__, __LINE__,
+            "OpenWireFormatNegotiator::start - "
+            "exceptionListener is invalid" );
+    }
+
+    if( next == NULL ){
+        throw exceptions::ActiveMQException(
+            __FILE__, __LINE__,
+            "OpenWireFormatNegotiator::start - "
+            "next transport is NULL" );
+    }
+
+    if( openWireFormat == NULL ){
+        throw exceptions::ActiveMQException(
+            __FILE__, __LINE__,
+            "OpenWireFormatNegotiator::start - "
+            "openWireFormat is NULL" );
+    }
+
+    // Start the delegate transport object.
+    next->start();
+
+    if( firstTime.compareAndSet( true, false ) ) {
+
+        try {
+
+            // Circumvent all other Transport filters and go straight for the base
+            // IOTransport, this should guarantee that there's no funny business done
+            // like async dispatch etc.  If it can't be found just use next and hope that
+            // there's nothing that will break the necessary thread locking that protects
+            // the message as it marshaled out to the wire
+            Transport* transport = this->next->narrow( typeid( transport::IOTransport ) );
+            if( transport == NULL ) {
+                transport = this->next;
+            }
+
+            // We first send the WireFormat that we'd prefer.
+            transport->oneway( openWireFormat->getPreferedWireFormatInfo() );
+
+            // Mark the latch
+            wireInfoSentDownLatch.countDown();
+
+        } catch( ActiveMQException& ex ) {
+
+            // Mark the latch
+            wireInfoSentDownLatch.countDown();
+            ex.setMark( __FILE__, __LINE__ );
+            throw ex;
+        }
+    }
+
+    // Mark it as open.
+    closed = false;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void OpenWireFormatNegotiator::close() throw( cms::CMSException ){
+
+    if( !closed && next != NULL ){
+        next->close();
+    }
+
+    closed = true;
+}
+

Propchange: activemq/activemq-cpp/trunk/src/main/activemq/wireformat/openwire/OpenWireFormatNegotiator.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-cpp/trunk/src/main/activemq/wireformat/openwire/OpenWireFormatNegotiator.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/wireformat/openwire/OpenWireFormatNegotiator.h?rev=733509&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/wireformat/openwire/OpenWireFormatNegotiator.h (added)
+++ activemq/activemq-cpp/trunk/src/main/activemq/wireformat/openwire/OpenWireFormatNegotiator.h Sun Jan 11 12:22:34 2009
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _ACTIVEMQ_WIREFORMAT_OPENWIRE_OPENWIREFORMATNEGOTIATOR_H_
+#define _ACTIVEMQ_WIREFORMAT_OPENWIRE_OPENWIREFORMATNEGOTIATOR_H_
+
+#include <activemq/util/Config.h>
+#include <activemq/transport/TransportFilter.h>
+#include <activemq/transport/Command.h>
+#include <activemq/wireformat/openwire/OpenWireFormat.h>
+#include <activemq/wireformat/WireFormatNegotiator.h>
+#include <decaf/util/concurrent/Mutex.h>
+#include <decaf/util/concurrent/CountDownLatch.h>
+#include <decaf/util/concurrent/Concurrent.h>
+#include <decaf/util/concurrent/atomic/AtomicBoolean.h>
+
+namespace activemq{
+namespace wireformat{
+namespace openwire{
+
+    class AMQCPP_API OpenWireFormatNegotiator : public wireformat::WireFormatNegotiator {
+    private:
+
+        /**
+         * Time to wait before we declare that the negotiation has timed out.
+         */
+        static const int negotiationTimeout = 15000;
+
+        /**
+         * Have we started already?
+         */
+        decaf::util::concurrent::atomic::AtomicBoolean firstTime;
+
+        /**
+         * Latch objects to count down till we receive the wireFormat info
+         */
+        decaf::util::concurrent::CountDownLatch wireInfoSentDownLatch;
+        decaf::util::concurrent::CountDownLatch readyCountDownLatch;
+
+        /**
+         * The OpenWireFormat object that we use in negotiation.
+         */
+        OpenWireFormat* openWireFormat;
+
+        /**
+         * Indicates Transport has shut down
+         */
+        bool closed;
+
+    public:
+
+        /**
+         * Constructor - Initializes this object around another Transport
+         * @param openWireFormat - The WireFormat object we use to negotiate
+         * @param next - The next transport in the chain
+         * @param own - do we own the Transport pointer.
+         */
+        OpenWireFormatNegotiator( OpenWireFormat* openWireFormat,
+                                  transport::Transport* next,
+                                  bool own = true );
+
+        virtual ~OpenWireFormatNegotiator();
+
+        /**
+         * Sends a one-way command.  Does not wait for any response from the
+         * broker.
+         * First waits for the WireFormatInfo exchange to happen so that we
+         * know how to encode outbound data.
+         * @param command the command to be sent.
+         * @throws CommandIOException if an exception occurs during writing of
+         * the command.
+         * @throws UnsupportedOperationException if this method is not implemented
+         * by this transport.
+         */
+        virtual void oneway( transport::Command* command )
+            throw( transport::CommandIOException,
+                   decaf::lang::exceptions::UnsupportedOperationException );
+
+        /**
+         * Sends the given request to the server and waits for the response.
+         * First waits for the WireFormatInfo exchange to happen so that we
+         * know how to encode outbound data.
+         * @param command The request to send.
+         * @return the response from the server.
+         * @throws CommandIOException if an error occurs with the request.
+         */
+        virtual transport::Response* request( transport::Command* command )
+            throw( transport::CommandIOException,
+                   decaf::lang::exceptions::UnsupportedOperationException );
+
+        /**
+         * Sends the given request to the server and waits for the response.
+         * First waits for the WireFormatInfo exchange to happen so that we
+         * know how to encode outbound data.
+         * @param command The request to send.
+         * @param timeout The time to wait for the response.
+         * @return the response from the server.
+         * @throws CommandIOException if an error occurs with the request.
+         */
+        virtual transport::Response* request( transport::Command* command, unsigned int timeout )
+            throw( transport::CommandIOException,
+                   decaf::lang::exceptions::UnsupportedOperationException );
+
+        /**
+         * This is called in the context of the nested transport's
+         * reading thread.  In the case of a response object,
+         * updates the request map and notifies those waiting on the
+         * response.  Non-response messages are just delegated to
+         * the command listener.
+         * @param command the received from the nested transport.
+         */
+        virtual void onCommand( transport::Command* command );
+
+        /**
+         * Event handler for an exception from a command transport.
+         * @param source The source of the exception
+         * @param ex The exception.
+         */
+        virtual void onTransportException( transport::Transport* source,
+                                           const decaf::lang::Exception& ex );
+
+        /**
+         * Starts this transport object and creates the thread for
+         * polling on the input stream for commands.  If this object
+         * has been closed, throws an exception.  Before calling start,
+         * the caller must set the IO streams and the reader and writer
+         * objects.
+         * @throws CMSException if an error occurs or if this transport
+         * has already been closed.
+         */
+        virtual void start() throw( cms::CMSException );
+
+        /**
+         * Stops the polling thread and closes the streams.  This can
+         * be called explicitly, but is also called in the destructor. Once
+         * this object has been closed, it cannot be restarted.
+         * @throws CMSException if errors occur.
+         */
+        virtual void close() throw( cms::CMSException );
+
+    };
+
+}}}
+
+#endif /*_ACTIVEMQ_WIREFORMAT_OPENWIRE_OPENWIREFORMATNEGOTIATOR_H_*/

Propchange: activemq/activemq-cpp/trunk/src/main/activemq/wireformat/openwire/OpenWireFormatNegotiator.h
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-cpp/trunk/src/main/activemq/wireformat/openwire/OpenWireResponseBuilder.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/wireformat/openwire/OpenWireResponseBuilder.cpp?rev=733509&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/wireformat/openwire/OpenWireResponseBuilder.cpp (added)
+++ activemq/activemq-cpp/trunk/src/main/activemq/wireformat/openwire/OpenWireResponseBuilder.cpp Sun Jan 11 12:22:34 2009
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "OpenWireResponseBuilder.h"
+
+#include <typeinfo>
+#include <activemq/commands/ActiveMQBytesMessage.h>
+#include <activemq/commands/ActiveMQMapMessage.h>
+#include <activemq/commands/ActiveMQMessage.h>
+#include <activemq/commands/ActiveMQObjectMessage.h>
+#include <activemq/commands/ActiveMQStreamMessage.h>
+#include <activemq/commands/ActiveMQTextMessage.h>
+#include <activemq/commands/BrokerInfo.h>
+#include <activemq/commands/ConnectionInfo.h>
+#include <activemq/commands/ConsumerInfo.h>
+#include <activemq/commands/DestinationInfo.h>
+#include <activemq/commands/ProducerInfo.h>
+#include <activemq/commands/Response.h>
+#include <activemq/commands/RemoveSubscriptionInfo.h>
+#include <activemq/commands/SessionInfo.h>
+#include <activemq/commands/ShutdownInfo.h>
+#include <activemq/commands/WireFormatInfo.h>
+
+using namespace activemq;
+using namespace activemq::wireformat;
+using namespace activemq::wireformat::openwire;
+using namespace activemq::transport;
+using namespace activemq::transport::mock;
+
+////////////////////////////////////////////////////////////////////////////////
+Response* OpenWireResponseBuilder::buildResponse(
+    const transport::Command* command ){
+
+    if( typeid( *command ) == typeid( commands::ActiveMQBytesMessage ) ||
+        typeid( *command ) == typeid( commands::ActiveMQMapMessage ) ||
+        typeid( *command ) == typeid( commands::ActiveMQMessage ) ||
+        typeid( *command ) == typeid( commands::ActiveMQObjectMessage ) ||
+        typeid( *command ) == typeid( commands::ActiveMQStreamMessage ) ||
+        typeid( *command ) == typeid( commands::ActiveMQTextMessage ) ||
+        typeid( *command ) == typeid( commands::ConnectionInfo ) ||
+        typeid( *command ) == typeid( commands::ConsumerInfo ) ||
+        typeid( *command ) == typeid( commands::DestinationInfo ) ||
+        typeid( *command ) == typeid( commands::ProducerInfo ) ||
+        typeid( *command ) == typeid( commands::RemoveSubscriptionInfo ) ||
+        typeid( *command ) == typeid( commands::SessionInfo ) ) {
+
+        // These Commands just require a response that matches their command IDs
+        commands::Response* response = new commands::Response();
+        response->setCorrelationId( command->getCommandId() );
+        return response;
+    }
+
+    // If this command requires a response we don't know what it is
+    // so we throw an exception.
+    if( command->isResponseRequired() ) {
+        throw transport::CommandIOException( __FILE__, __LINE__,
+            "OpenWireResponseBuilder - unrecognized command" );
+    }
+
+    return NULL;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void OpenWireResponseBuilder::buildIncomingCommands(
+    const transport::Command* command, decaf::util::Queue<transport::Command*>& queue ){
+
+    // Delegate this to buildResponse
+    if( command->isResponseRequired() ) {
+        queue.push( buildResponse( command ) );
+    }
+
+    if( typeid( *command ) == typeid( commands::WireFormatInfo ) ) {
+
+        // Return a copy of the callers own requested WireFormatInfo
+        // so they get exactly the settings they asked for.
+        queue.push( command->cloneCommand() );
+    }
+}

Propchange: activemq/activemq-cpp/trunk/src/main/activemq/wireformat/openwire/OpenWireResponseBuilder.cpp
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-cpp/trunk/src/main/activemq/wireformat/openwire/OpenWireResponseBuilder.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/wireformat/openwire/OpenWireResponseBuilder.h?rev=733509&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/wireformat/openwire/OpenWireResponseBuilder.h (added)
+++ activemq/activemq-cpp/trunk/src/main/activemq/wireformat/openwire/OpenWireResponseBuilder.h Sun Jan 11 12:22:34 2009
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ACTIVEMQ_WIREFORMAT_OPENWIRE_OPENWIRERESPONSEBUILDER_H_
+#define ACTIVEMQ_WIREFORMAT_OPENWIRE_OPENWIRERESPONSEBUILDER_H_
+
+#include <activemq/util/Config.h>
+#include <activemq/transport/mock/MockTransport.h>
+
+namespace activemq{
+namespace wireformat{
+namespace openwire{
+
+    class AMQCPP_API OpenWireResponseBuilder :
+        public transport::mock::MockTransport::ResponseBuilder{
+    public:
+
+        OpenWireResponseBuilder() {}
+        virtual ~OpenWireResponseBuilder() {}
+
+        virtual transport::Response* buildResponse( const transport::Command* command );
+
+        virtual void buildIncomingCommands(
+            const transport::Command* command,
+            decaf::util::Queue<transport::Command*>& queue );
+
+    };
+
+}}}
+
+#endif /*ACTIVEMQ_WIREFORMAT_OPENWIRE_OPENWIRERESPONSEBUILDER_H_*/

Propchange: activemq/activemq-cpp/trunk/src/main/activemq/wireformat/openwire/OpenWireResponseBuilder.h
------------------------------------------------------------------------------
    svn:eol-style = native

Added: activemq/activemq-cpp/trunk/src/main/activemq/wireformat/openwire/marshal/BaseDataStreamMarshaller.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/src/main/activemq/wireformat/openwire/marshal/BaseDataStreamMarshaller.cpp?rev=733509&view=auto
==============================================================================
--- activemq/activemq-cpp/trunk/src/main/activemq/wireformat/openwire/marshal/BaseDataStreamMarshaller.cpp (added)
+++ activemq/activemq-cpp/trunk/src/main/activemq/wireformat/openwire/marshal/BaseDataStreamMarshaller.cpp Sun Jan 11 12:22:34 2009
@@ -0,0 +1,833 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <activemq/wireformat/openwire/marshal/BaseDataStreamMarshaller.h>
+#include <activemq/wireformat/openwire/utils/HexTable.h>
+#include <activemq/commands/MessageId.h>
+#include <activemq/commands/ProducerId.h>
+#include <activemq/commands/TransactionId.h>
+#include <activemq/commands/LocalTransactionId.h>
+#include <activemq/commands/XATransactionId.h>
+#include <activemq/commands/BrokerError.h>
+#include <activemq/exceptions/ActiveMQException.h>
+#include <decaf/lang/Long.h>
+#include <decaf/lang/Integer.h>
+#include <activemq/util/Config.h>
+
+using namespace std;
+using namespace activemq;
+using namespace activemq::util;
+using namespace activemq::exceptions;
+using namespace activemq::commands;
+using namespace activemq::wireformat;
+using namespace activemq::wireformat::openwire;
+using namespace activemq::wireformat::openwire::marshal;
+using namespace activemq::wireformat::openwire::utils;
+using namespace decaf::io;
+using namespace decaf::util;
+using namespace decaf::lang;
+
+////////////////////////////////////////////////////////////////////////////////
+utils::HexTable BaseDataStreamMarshaller::hexTable;
+
+////////////////////////////////////////////////////////////////////////////////
+commands::DataStructure* BaseDataStreamMarshaller::tightUnmarshalCachedObject(
+    OpenWireFormat* wireFormat,
+    decaf::io::DataInputStream* dataIn,
+    utils::BooleanStream* bs ) throw ( decaf::io::IOException ) {
+
+    try{
+        return wireFormat->tightUnmarshalNestedObject( dataIn, bs );
+    }
+    AMQ_CATCH_RETHROW( IOException )
+    AMQ_CATCH_EXCEPTION_CONVERT( Exception, IOException )
+    AMQ_CATCHALL_THROW( IOException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+int BaseDataStreamMarshaller::tightMarshalCachedObject1(
+    OpenWireFormat* wireFormat,
+    commands::DataStructure* data,
+    utils::BooleanStream* bs )
+        throw ( decaf::io::IOException ) {
+
+    try{
+        return wireFormat->tightMarshalNestedObject1( data, bs );
+    }
+    AMQ_CATCH_RETHROW( IOException )
+    AMQ_CATCH_EXCEPTION_CONVERT( Exception, IOException )
+    AMQ_CATCHALL_THROW( IOException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void BaseDataStreamMarshaller::tightMarshalCachedObject2(
+    OpenWireFormat* wireFormat,
+    commands::DataStructure* data,
+    decaf::io::DataOutputStream* dataOut,
+    utils::BooleanStream* bs )
+        throw ( decaf::io::IOException ) {
+
+    try{
+        wireFormat->tightMarshalNestedObject2( data, dataOut, bs );
+    }
+    AMQ_CATCH_RETHROW( IOException )
+    AMQ_CATCH_EXCEPTION_CONVERT( Exception, IOException )
+    AMQ_CATCHALL_THROW( IOException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void BaseDataStreamMarshaller::looseMarshalCachedObject(
+    OpenWireFormat* wireFormat,
+    commands::DataStructure* data,
+    decaf::io::DataOutputStream* dataOut )
+        throw ( decaf::io::IOException ) {
+
+    try{
+        wireFormat->looseMarshalNestedObject( data, dataOut );
+    }
+    AMQ_CATCH_RETHROW( IOException )
+    AMQ_CATCH_EXCEPTION_CONVERT( Exception, IOException )
+    AMQ_CATCHALL_THROW( IOException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+commands::DataStructure* BaseDataStreamMarshaller::looseUnmarshalCachedObject(
+    OpenWireFormat* wireFormat,
+    decaf::io::DataInputStream* dataIn ) throw ( decaf::io::IOException ) {
+
+    try{
+        return wireFormat->looseUnmarshalNestedObject( dataIn );
+    }
+    AMQ_CATCH_RETHROW( IOException )
+    AMQ_CATCH_EXCEPTION_CONVERT( Exception, IOException )
+    AMQ_CATCHALL_THROW( IOException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+int BaseDataStreamMarshaller::tightMarshalNestedObject1(
+    OpenWireFormat* wireFormat,
+    commands::DataStructure* object,
+    utils::BooleanStream* bs )
+        throw ( decaf::io::IOException ) {
+
+    try{
+        return wireFormat->tightMarshalNestedObject1( object, bs );
+    }
+    AMQ_CATCH_RETHROW( IOException )
+    AMQ_CATCH_EXCEPTION_CONVERT( Exception, IOException )
+    AMQ_CATCHALL_THROW( IOException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void BaseDataStreamMarshaller::tightMarshalNestedObject2(
+    OpenWireFormat* wireFormat,
+    commands::DataStructure* object,
+    decaf::io::DataOutputStream* dataOut,
+    utils::BooleanStream* bs )
+        throw ( decaf::io::IOException ) {
+
+    try{
+        wireFormat->tightMarshalNestedObject2( object, dataOut, bs );
+    }
+    AMQ_CATCH_RETHROW( IOException )
+    AMQ_CATCH_EXCEPTION_CONVERT( Exception, IOException )
+    AMQ_CATCHALL_THROW( IOException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+commands::DataStructure* BaseDataStreamMarshaller::tightUnmarshalNestedObject(
+    OpenWireFormat* wireFormat,
+    decaf::io::DataInputStream* dataIn,
+    utils::BooleanStream* bs )
+        throw ( decaf::io::IOException ) {
+
+    try{
+        return wireFormat->tightUnmarshalNestedObject( dataIn, bs );
+    }
+    AMQ_CATCH_RETHROW( IOException )
+    AMQ_CATCH_EXCEPTION_CONVERT( Exception, IOException )
+    AMQ_CATCHALL_THROW( IOException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+commands::DataStructure* BaseDataStreamMarshaller::looseUnmarshalNestedObject(
+    OpenWireFormat* wireFormat,
+    decaf::io::DataInputStream* dataIn )
+        throw ( decaf::io::IOException ) {
+
+    try{
+        return wireFormat->looseUnmarshalNestedObject( dataIn );
+    }
+    AMQ_CATCH_RETHROW( IOException )
+    AMQ_CATCH_EXCEPTION_CONVERT( Exception, IOException )
+    AMQ_CATCHALL_THROW( IOException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void BaseDataStreamMarshaller::looseMarshalNestedObject(
+    OpenWireFormat* wireFormat,
+    commands::DataStructure* object,
+    decaf::io::DataOutputStream* dataOut )
+        throw ( decaf::io::IOException ) {
+
+    try{
+        wireFormat->looseMarshalNestedObject( object, dataOut );
+    }
+    AMQ_CATCH_RETHROW( IOException )
+    AMQ_CATCH_EXCEPTION_CONVERT( Exception, IOException )
+    AMQ_CATCHALL_THROW( IOException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+std::string BaseDataStreamMarshaller::tightUnmarshalString(
+    decaf::io::DataInputStream* dataIn,
+    utils::BooleanStream* bs ) throw ( decaf::io::IOException ) {
+
+    try{
+
+        if( bs->readBoolean() ) {
+
+            if (bs->readBoolean() ) {
+                return this->readAsciiString( dataIn );
+            } else {
+                return dataIn->readUTF();
+            }
+        } else {
+            return "";
+        }
+    }
+    AMQ_CATCH_RETHROW( IOException )
+    AMQ_CATCH_EXCEPTION_CONVERT( Exception, IOException )
+    AMQ_CATCHALL_THROW( IOException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+int BaseDataStreamMarshaller::tightMarshalString1( const std::string& value,
+                                                   utils::BooleanStream* bs )
+                                                    throw ( decaf::io::IOException ) {
+    try{
+
+        bs->writeBoolean( value != "" );
+        if( value != "" )
+        {
+            size_t strlen = value.length();
+
+            int utflen = 0;
+            int c = 0;
+            bool isOnlyAscii = true;
+
+            for( size_t i = 0; i < strlen; ++i ) {
+
+                c = value[i];
+                if( (c >= 0x0001) && (c <= 0x007F) )  // ASCII char
+                {
+                    utflen++;
+                }
+                else if( c > 0x07FF )
+                {
+                    utflen += 3;
+                    isOnlyAscii = false;
+                }
+                else
+                {
+                    isOnlyAscii = false;
+                    utflen += 2;
+                }
+            }
+
+            if( utflen >= 0x10000 ) {
+                throw IOException(
+                    __FILE__, __LINE__,
+                    "BaseDataStreamMarshaller::tightMarshalString1 - "
+                    "Encountered a String value that is too long to encode." );
+            }
+
+            bs->writeBoolean( isOnlyAscii );
+
+            return utflen + 2;
+        }
+        else
+        {
+            return 0;
+        }
+    }
+    AMQ_CATCH_RETHROW( IOException )
+    AMQ_CATCH_EXCEPTION_CONVERT( ActiveMQException, IOException )
+    AMQ_CATCHALL_THROW( IOException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void BaseDataStreamMarshaller::tightMarshalString2(
+    const std::string& value,
+    decaf::io::DataOutputStream* dataOut,
+    utils::BooleanStream* bs ) throw ( decaf::io::IOException ) {
+
+    try{
+
+        if( bs->readBoolean() ) {
+
+            // If we verified it only holds ascii values
+            if( bs->readBoolean() ) {
+                dataOut->writeShort( (short)value.length() );
+                dataOut->writeBytes( value );
+            } else {
+                dataOut->writeChars( value );
+            }
+        }
+    }
+    AMQ_CATCH_RETHROW( IOException )
+    AMQ_CATCH_EXCEPTION_CONVERT( Exception, IOException )
+    AMQ_CATCHALL_THROW( IOException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void BaseDataStreamMarshaller::looseMarshalString(
+    const std::string value,
+    decaf::io::DataOutputStream* dataOut ) throw ( decaf::io::IOException ) {
+
+    try{
+
+        dataOut->writeBoolean( value != "" );
+        if( value != "" ) {
+            dataOut->writeUTF( value );
+        }
+    }
+    AMQ_CATCH_RETHROW( IOException )
+    AMQ_CATCH_EXCEPTION_CONVERT( Exception, IOException )
+    AMQ_CATCHALL_THROW( IOException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+std::string BaseDataStreamMarshaller::looseUnmarshalString(
+    decaf::io::DataInputStream* dataIn ) throw ( decaf::io::IOException ) {
+
+    try{
+        if( dataIn->readBoolean() ) {
+            return dataIn->readUTF();
+        } else {
+            return "";
+        }
+    }
+    AMQ_CATCH_RETHROW( IOException )
+    AMQ_CATCH_EXCEPTION_CONVERT( Exception, IOException )
+    AMQ_CATCHALL_THROW( IOException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+int BaseDataStreamMarshaller::tightMarshalLong1( OpenWireFormat* wireFormat AMQCPP_UNUSED,
+                                                 long long value,
+                                                 utils::BooleanStream* bs )
+                                                    throw ( decaf::io::IOException ) {
+
+    try{
+
+        if( value == 0L ) {
+
+            bs->writeBoolean( false );
+            bs->writeBoolean( false );
+            return 0;
+
+        } else {
+
+            unsigned long long ul = (unsigned long long)value;
+            if( (ul & 0xFFFFFFFFFFFF0000ULL) == 0ULL ) {
+                bs->writeBoolean(false);
+                bs->writeBoolean(true);
+                return 2;
+            } else if( (ul & 0xFFFFFFFF00000000ULL) == 0ULL ) {
+                bs->writeBoolean(true);
+                bs->writeBoolean(false);
+                return 4;
+            } else {
+                bs->writeBoolean(true);
+                bs->writeBoolean(true);
+                return 8;
+            }
+        }
+    }
+    AMQ_CATCH_RETHROW( IOException )
+    AMQ_CATCH_EXCEPTION_CONVERT( Exception, IOException )
+    AMQ_CATCHALL_THROW( IOException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void BaseDataStreamMarshaller::tightMarshalLong2( OpenWireFormat* wireFormat AMQCPP_UNUSED,
+                                                  long long value,
+                                                  decaf::io::DataOutputStream* dataOut,
+                                                  utils::BooleanStream* bs )
+                                                    throw ( decaf::io::IOException ) {
+    try{
+
+        if( bs->readBoolean() ) {
+
+            if( bs->readBoolean() ) {
+                dataOut->writeLong( value );
+            } else {
+                dataOut->writeInt( (int)value );
+            }
+
+        } else {
+
+            if( bs->readBoolean() ) {
+                dataOut->writeShort( (short)value );
+            }
+        }
+    }
+    AMQ_CATCH_RETHROW( IOException )
+    AMQ_CATCH_EXCEPTION_CONVERT( Exception, IOException )
+    AMQ_CATCHALL_THROW( IOException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+long long BaseDataStreamMarshaller::tightUnmarshalLong(
+    OpenWireFormat* wireFormat AMQCPP_UNUSED,
+    decaf::io::DataInputStream* dataIn,
+    utils::BooleanStream* bs )
+        throw ( decaf::io::IOException ) {
+
+    try{
+        if( bs->readBoolean() ) {
+
+            if( bs->readBoolean() ) {
+                return dataIn->readLong();
+            } else {
+                return (unsigned int)dataIn->readInt();
+            }
+
+        } else {
+
+            if( bs->readBoolean()) {
+                return dataIn->readUnsignedShort();
+            } else {
+                return 0;
+            }
+        }
+    }
+    AMQ_CATCH_RETHROW( IOException )
+    AMQ_CATCH_EXCEPTION_CONVERT( Exception, IOException )
+    AMQ_CATCHALL_THROW( IOException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void BaseDataStreamMarshaller::looseMarshalLong( OpenWireFormat* wireFormat AMQCPP_UNUSED,
+                                                 long long value,
+                                                 decaf::io::DataOutputStream* dataOut )
+                                                    throw ( decaf::io::IOException ) {
+
+    try{
+        dataOut->writeLong( value );
+    }
+    AMQ_CATCH_RETHROW( IOException )
+    AMQ_CATCH_EXCEPTION_CONVERT( Exception, IOException )
+    AMQ_CATCHALL_THROW( IOException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+long long BaseDataStreamMarshaller::looseUnmarshalLong(
+    OpenWireFormat* wireFormat AMQCPP_UNUSED,
+    decaf::io::DataInputStream* dataIn )
+        throw ( decaf::io::IOException ) {
+
+    try{
+        return dataIn->readLong();
+    }
+    AMQ_CATCH_RETHROW( IOException )
+    AMQ_CATCH_EXCEPTION_CONVERT( Exception, IOException )
+    AMQ_CATCHALL_THROW( IOException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+commands::DataStructure* BaseDataStreamMarshaller::tightUnmarshalBrokerError(
+    OpenWireFormat* wireFormat,
+    decaf::io::DataInputStream* dataIn,
+    utils::BooleanStream* bs ) throw ( decaf::io::IOException ) {
+
+    try{
+
+        if( bs->readBoolean() ) {
+
+            BrokerError* answer = new BrokerError();
+
+            answer->setExceptionClass( tightUnmarshalString( dataIn, bs ) );
+            answer->setMessage( tightUnmarshalString( dataIn, bs ) );
+
+            if( wireFormat->isStackTraceEnabled() ) {
+                short length = dataIn->readShort();
+                std::vector< BrokerError::StackTraceElement* > stackTrace;
+
+                for( int i = 0; i < length; ++i ) {
+
+                    BrokerError::StackTraceElement* element =
+                        new BrokerError::StackTraceElement;
+
+                    element->ClassName = tightUnmarshalString( dataIn, bs );
+                    element->MethodName = tightUnmarshalString( dataIn, bs );
+                    element->FileName = tightUnmarshalString( dataIn, bs );
+                    element->LineNumber = dataIn->readInt();
+                    stackTrace.push_back( element );
+                }
+
+                answer->setStackTraceElements( stackTrace );
+                answer->setCause( dynamic_cast<BrokerError*>(
+                    tightUnmarshalBrokerError( wireFormat, dataIn, bs ) ) );
+            }
+
+            return answer;
+
+        } else {
+            return NULL;
+        }
+    }
+    AMQ_CATCH_RETHROW( IOException )
+    AMQ_CATCH_EXCEPTION_CONVERT( Exception, IOException )
+    AMQ_CATCHALL_THROW( IOException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+int BaseDataStreamMarshaller::tightMarshalBrokerError1(
+    OpenWireFormat* wireFormat,
+    commands::DataStructure* data,
+    utils::BooleanStream* bs )
+        throw ( decaf::io::IOException ) {
+
+    try{
+
+        BrokerError* error = dynamic_cast<BrokerError*>( data );
+
+        if( error == NULL ) {
+
+            bs->writeBoolean( false );
+            return 0;
+
+        } else {
+
+            int rc = 0;
+            bs->writeBoolean( true );
+            rc += tightMarshalString1( error->getExceptionClass(), bs );
+            rc += tightMarshalString1( error->getMessage(), bs );
+
+            if( wireFormat->isStackTraceEnabled() ) {
+
+                rc += 2;
+
+                for( unsigned int i = 0; i < error->getStackTraceElements().size(); ++i ) {
+
+                    const BrokerError::StackTraceElement* element =
+                        error->getStackTraceElements()[i];
+                    rc += tightMarshalString1( element->ClassName, bs );
+                    rc += tightMarshalString1( element->MethodName, bs );
+                    rc += tightMarshalString1( element->FileName, bs );
+                    rc += 4;
+                }
+                rc += tightMarshalBrokerError1( wireFormat, error->getCause(), bs );
+            }
+
+            return rc;
+        }
+    }
+    AMQ_CATCH_RETHROW( IOException )
+    AMQ_CATCH_EXCEPTION_CONVERT( Exception, IOException )
+    AMQ_CATCHALL_THROW( IOException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void BaseDataStreamMarshaller::tightMarshalBrokerError2(
+    OpenWireFormat* wireFormat,
+    commands::DataStructure* data,
+    decaf::io::DataOutputStream* dataOut,
+    utils::BooleanStream* bs )
+        throw ( decaf::io::IOException ) {
+
+    try{
+
+        if( bs->readBoolean() ) {
+
+            BrokerError* error = dynamic_cast<BrokerError*>( data );
+
+            tightMarshalString2( error->getExceptionClass(), dataOut, bs );
+            tightMarshalString2( error->getMessage(), dataOut, bs );
+
+            if( wireFormat->isStackTraceEnabled() ) {
+
+                int length = (short)error->getStackTraceElements().size();
+                dataOut->writeShort( (short)length );
+
+                for( int i = 0; i < length; ++i ) {
+
+                    BrokerError::StackTraceElement* element =
+                        error->getStackTraceElements()[i];
+
+                    tightMarshalString2( element->ClassName, dataOut, bs );
+                    tightMarshalString2( element->MethodName, dataOut, bs );
+                    tightMarshalString2( element->FileName, dataOut, bs );
+                    dataOut->writeInt( element->LineNumber );
+                }
+
+                tightMarshalBrokerError2(
+                    wireFormat, error->getCause(), dataOut, bs );
+            }
+        }
+    }
+    AMQ_CATCH_RETHROW( IOException )
+    AMQ_CATCH_EXCEPTION_CONVERT( Exception, IOException )
+    AMQ_CATCHALL_THROW( IOException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+commands::DataStructure* BaseDataStreamMarshaller::looseUnmarshalBrokerError(
+    OpenWireFormat* wireFormat,
+    decaf::io::DataInputStream* dataIn )
+        throw ( decaf::io::IOException ) {
+
+    try{
+
+        if( dataIn->readBoolean() ) {
+
+            BrokerError* answer = new BrokerError();
+
+            answer->setExceptionClass( looseUnmarshalString( dataIn ) );
+            answer->setMessage( looseUnmarshalString( dataIn ) );
+
+            if( wireFormat->isStackTraceEnabled() ) {
+
+                short length = dataIn->readShort();
+                std::vector< BrokerError::StackTraceElement* > stackTrace;
+
+                for( int i = 0; i < length; ++i ) {
+
+                    BrokerError::StackTraceElement* element =
+                        new BrokerError::StackTraceElement;
+
+                    element->ClassName = looseUnmarshalString( dataIn );
+                    element->MethodName = looseUnmarshalString( dataIn );
+                    element->FileName = looseUnmarshalString( dataIn );
+                    element->LineNumber = dataIn->readInt();
+
+                    stackTrace.push_back( element );
+                }
+                answer->setStackTraceElements( stackTrace );
+                answer->setCause( dynamic_cast<BrokerError*>(
+                    looseUnmarshalBrokerError( wireFormat, dataIn ) ) );
+            }
+
+            return answer;
+
+        } else {
+            return NULL;
+        }
+    }
+    AMQ_CATCH_RETHROW( IOException )
+    AMQ_CATCH_EXCEPTION_CONVERT( Exception, IOException )
+    AMQ_CATCHALL_THROW( IOException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void BaseDataStreamMarshaller::looseMarshalBrokerError(
+    OpenWireFormat* wireFormat,
+    commands::DataStructure* data,
+    decaf::io::DataOutputStream* dataOut )
+        throw ( decaf::io::IOException ) {
+
+    try{
+
+        BrokerError* error = dynamic_cast<BrokerError*>( data );
+
+        dataOut->write( error != NULL );
+
+        if( error != NULL ){
+
+            looseMarshalString( error->getExceptionClass(), dataOut );
+            looseMarshalString( error->getMessage(), dataOut );
+
+            if( wireFormat->isStackTraceEnabled() ) {
+
+                size_t length = error->getStackTraceElements().size();
+
+                dataOut->writeShort( (short)length );
+
+                for( size_t i = 0; i < length; ++i ) {
+
+                    BrokerError::StackTraceElement* element =
+                        error->getStackTraceElements()[i];
+
+                    looseMarshalString( element->ClassName, dataOut );
+                    looseMarshalString( element->MethodName, dataOut );
+                    looseMarshalString( element->FileName, dataOut );
+
+                    dataOut->writeInt( element->LineNumber );
+                }
+
+                looseMarshalBrokerError( wireFormat, error->getCause(), dataOut );
+            }
+        }
+    }
+    AMQ_CATCH_RETHROW( IOException )
+    AMQ_CATCH_EXCEPTION_CONVERT( Exception, IOException )
+    AMQ_CATCHALL_THROW( IOException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+std::vector<unsigned char> BaseDataStreamMarshaller::tightUnmarshalByteArray(
+    decaf::io::DataInputStream* dataIn,
+    utils::BooleanStream* bs )
+        throw ( decaf::io::IOException ) {
+
+    try{
+
+        std::vector<unsigned char> data;
+
+        if( bs->readBoolean() ) {
+            int size = dataIn->readInt();
+            data.resize( size );
+            dataIn->readFully( data );
+        }
+
+        return data;
+    }
+    AMQ_CATCH_RETHROW( IOException )
+    AMQ_CATCH_EXCEPTION_CONVERT( Exception, IOException )
+    AMQ_CATCHALL_THROW( IOException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+std::vector<unsigned char> BaseDataStreamMarshaller::looseUnmarshalByteArray(
+    decaf::io::DataInputStream* dataIn )
+        throw ( decaf::io::IOException ) {
+
+    try{
+
+        if( dataIn->readBoolean() ) {
+            int size = dataIn->readInt();
+            std::vector<unsigned char> data;
+            data.resize( size );
+            dataIn->readFully( data );
+            return data;
+        }
+
+        return std::vector<unsigned char>();
+    }
+    AMQ_CATCH_RETHROW( IOException )
+    AMQ_CATCH_EXCEPTION_CONVERT( Exception, IOException )
+    AMQ_CATCHALL_THROW( IOException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+std::vector<unsigned char> BaseDataStreamMarshaller::tightUnmarshalConstByteArray(
+    decaf::io::DataInputStream* dataIn,
+    utils::BooleanStream* bs AMQCPP_UNUSED,
+    int size )
+        throw ( decaf::io::IOException ) {
+
+    try{
+        std::vector<unsigned char> data;
+        data.resize( size );
+        dataIn->readFully( data );
+        return data;
+    }
+    AMQ_CATCH_RETHROW( IOException )
+    AMQ_CATCH_EXCEPTION_CONVERT( Exception, IOException )
+    AMQ_CATCHALL_THROW( IOException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+std::vector<unsigned char> BaseDataStreamMarshaller::looseUnmarshalConstByteArray(
+    decaf::io::DataInputStream* dataIn,
+    int size )
+        throw ( decaf::io::IOException ) {
+
+    try{
+        std::vector<unsigned char> data;
+        data.resize( size );
+        dataIn->readFully( data );
+        return data;
+    }
+    AMQ_CATCH_RETHROW( IOException )
+    AMQ_CATCH_EXCEPTION_CONVERT( Exception, IOException )
+    AMQ_CATCHALL_THROW( IOException )
+}
+
+////////////////////////////////////////////////////////////////////////////////
+std::string BaseDataStreamMarshaller::toString( const commands::MessageId* id ) {
+    if( id == NULL ) return "";
+
+    return toString( id->getProducerId() ) + ":" +
+           Long::toString( id->getProducerSequenceId() );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+std::string BaseDataStreamMarshaller::toString( const commands::ProducerId* id ) {
+    return id->getConnectionId() + ":" +
+           Long::toString( id->getSessionId() ) + ":" +
+           Long::toString( id->getValue() );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+std::string BaseDataStreamMarshaller::toString( const commands::TransactionId* txnId ) {
+
+    const LocalTransactionId* ltxnId =
+        dynamic_cast<const LocalTransactionId*>( txnId );
+    const XATransactionId* xaTxnId =
+        dynamic_cast<const XATransactionId*>( txnId );
+
+    if( ltxnId != NULL ) {
+        return Long::toString( ltxnId->getValue() );
+    } else if( xaTxnId != NULL ) {
+        return string("XID:") + Integer::toString( xaTxnId->getFormatId() ) + ":" +
+               toHexFromBytes( xaTxnId->getGlobalTransactionId() ) + ":" +
+               toHexFromBytes( xaTxnId->getBranchQualifier() );
+    }
+
+    return "";
+}
+
+////////////////////////////////////////////////////////////////////////////////
+std::string BaseDataStreamMarshaller::toHexFromBytes(
+    const std::vector<unsigned char>& data ) {
+
+    std::string buffer = "";
+
+    for( unsigned int i = 0; i < data.size(); i++ ) {
+        buffer.append( hexTable[ data[i] ] );
+    }
+
+    return buffer;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+std::string BaseDataStreamMarshaller::readAsciiString(
+    decaf::io::DataInputStream* dataIn )
+        throw ( decaf::io::IOException ) {
+
+    try{
+
+        int size = dataIn->readShort() + 1; // add space c++ NULL
+        unsigned char* data = new unsigned char[size];
+        dataIn->readFully( data, 0, size-1 );
+        data[size-1] = 0;  // enforce NULL
+
+        // Now build a string and copy data into it.
+        std::string text;
+        text.resize( size );
+        text.assign( (char*)data, (int)size-1 );
+        delete [] data;
+
+        return text;
+    }
+    AMQ_CATCH_RETHROW( IOException )
+    AMQ_CATCH_EXCEPTION_CONVERT( Exception, IOException )
+    AMQ_CATCHALL_THROW( IOException )
+}

Propchange: activemq/activemq-cpp/trunk/src/main/activemq/wireformat/openwire/marshal/BaseDataStreamMarshaller.cpp
------------------------------------------------------------------------------
    svn:eol-style = native



Mime
View raw message