activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From tab...@apache.org
Subject svn commit: r505718 - in /activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire: OpenWireFormat.h OpenWireFormatNegotiator.cpp OpenWireFormatNegotiator.h commands/WireFormatInfo.h
Date Sat, 10 Feb 2007 16:37:29 GMT
Author: tabish
Date: Sat Feb 10 08:37:29 2007
New Revision: 505718

URL: http://svn.apache.org/viewvc?view=rev&rev=505718
Log:
http://issues.apache.org/activemq/browse/AMQCPP-30

Modified:
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireFormat.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireFormatNegotiator.cpp
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireFormatNegotiator.h
    activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/commands/WireFormatInfo.h

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireFormat.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireFormat.h?view=diff&rev=505718&r1=505717&r2=505718
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireFormat.h
(original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireFormat.h
Sat Feb 10 08:37:29 2007
@@ -38,7 +38,7 @@
         /**
          * Constructs a new OpenWireFormat object
          * @param properties - can contain optional config params.
-         */    
+         */
         OpenWireFormat( const activemq::util::Properties& properties );
 
         virtual ~OpenWireFormat();
@@ -54,17 +54,17 @@
          * Stream based marshaling
          * @param command - The Command to Marshal
          * @param out - the output stream to write the command to.
-         * @throws IOException 
+         * @throws IOException
          */
-        virtual void marshal( transport::Command* command, 
-                              io::DataOutputStream* dataOut ) 
+        virtual void marshal( transport::Command* command,
+                              io::DataOutputStream* dataOut )
             throw ( io::IOException );
 
         /**
-         * Packet based un-marshaling 
+         * Packet based un-marshaling
          * @param dis - the input stream to read the command from.
          * @returns the newly marshaled Command, caller owns the pointer
-         * @throws IOException 
+         * @throws IOException
          */
         virtual transport::Command* unmarshal( io::DataInputStream* dis ) throw ( io::IOException
);
 
@@ -75,7 +75,7 @@
          * @param bs - the BooleanStream to write to
          * @returns size of the data returned.
          */
-        virtual int tightMarshalNestedObject1( commands::DataStructure* object, 
+        virtual int tightMarshalNestedObject1( commands::DataStructure* object,
                                                utils::BooleanStream* bs )
             throw ( io::IOException );
 
@@ -88,8 +88,8 @@
          * @param bs - BooleanStream
          * @throws IOException if an error occurs.
          */
-        void tightMarshalNestedObject2( commands::DataStructure* o, 
-                                        io::DataOutputStream* ds, 
+        void tightMarshalNestedObject2( commands::DataStructure* o,
+                                        io::DataOutputStream* ds,
                                         utils::BooleanStream* bs ) throw ( io::IOException
);
 
         /**
@@ -101,12 +101,12 @@
          * @returns Newly allocated DataStructure Object
          * @throws IOException if an error occurs.
          */
-        commands::DataStructure* tightUnmarshalNestedObject( io::DataInputStream* dis, 
-                                                             utils::BooleanStream* bs ) 
+        commands::DataStructure* tightUnmarshalNestedObject( io::DataInputStream* dis,
+                                                             utils::BooleanStream* bs )
             throw ( io::IOException );
 
         /**
-         * Utility method to unmarshal an DataStructure object from an 
+         * Utility method to unmarshal an DataStructure object from an
          * DataInputStream using the Loose Unmarshalling format.  Will read
          * the Data and construct a new DataStructure based Object, the
          * pointer to the Object returned is now owned by the caller.
@@ -114,20 +114,20 @@
          * @returns a new DataStructure derived Object pointer
          * @throws IOException if an error occurs.
          */
-        commands::DataStructure* looseUnmarshalNestedObject( 
-            io::DataInputStream* dis) 
+        commands::DataStructure* looseUnmarshalNestedObject(
+            io::DataInputStream* dis)
                 throw ( io::IOException );
 
         /**
          * Utility method to loosely Marshal an object that is derived from the
-         * DataStrucutre interface.  The marshalled data is written to the 
+         * DataStrucutre interface.  The marshalled data is written to the
          * passed in DataOutputStream.
          * @param o - DataStructure derived Object to Marshal
          * @param dataOut - DataOutputStream to write the data to
          * @throw IOException if an error occurs.
          */
-        void looseMarshalNestedObject( commands::DataStructure* o, 
-                                       io::DataOutputStream* dataOut ) 
+        void looseMarshalNestedObject( commands::DataStructure* o,
+                                       io::DataOutputStream* dataOut )
                                            throw ( io::IOException );
 
         /**
@@ -136,7 +136,7 @@
          * @param info - The new Wireformat Info settings
          * @throws IllegalStateException is the params can't be negotiated.
          */
-        void renegotiateWireFormat( commands::WireFormatInfo* info ) 
+        void renegotiateWireFormat( commands::WireFormatInfo* info )
             throw ( exceptions::IllegalStateException );
 
         /**
@@ -147,6 +147,14 @@
             throw ( exceptions::IllegalStateException );
 
         /**
+         * Gets the Preferend WireFormatInfo object that this class holds
+         * @return pointer to a prefered WireFormatInfo object
+         */
+        virtual commands::WireFormatInfo* getPreferedWireFormatInfo() const {
+            return this->preferedWireFormatInfo;
+        }
+
+        /**
          * Checks if the stackTraceEnabled flag is on
          * @return true if the flag is on.
          */
@@ -246,20 +254,20 @@
 
         /**
          * Perform the actual unmarshal of data from the given DataInputStream
-         * return the unmarshalled DataStrucutre object once done, caller takes 
-         * ownership of this object.  This method can return null if the type 
+         * return the unmarshalled DataStrucutre object once done, caller takes
+         * ownership of this object.  This method can return null if the type
          * of the object to unmarshal is NULL, empty data.
          * @param dis - DataInputStream to read from
          * @returns new DataStructure* that the caller owns
          * @throws IOException if an error occurs during the unmarshal
          */
-        commands::DataStructure* doUnmarshal( io::DataInputStream* dis ) 
+        commands::DataStructure* doUnmarshal( io::DataInputStream* dis )
             throw ( io::IOException );
 
     protected:
 
-        // Declared here to make life easier.    
-        static const unsigned char NULL_TYPE; 
+        // Declared here to make life easier.
+        static const unsigned char NULL_TYPE;
 
     private:
 

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireFormatNegotiator.cpp
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireFormatNegotiator.cpp?view=diff&rev=505718&r1=505717&r2=505718
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireFormatNegotiator.cpp
(original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireFormatNegotiator.cpp
Sat Feb 10 08:37:29 2007
@@ -17,12 +17,16 @@
 
 #include "OpenWireFormatNegotiator.h"
 
+#include <activemq/connector/openwire/commands/DataStructure.h>
+#include <activemq/connector/openwire/commands/WireFormatInfo.h>
+
 using namespace activemq;
 using namespace activemq::exceptions;
 using namespace activemq::transport;
 using namespace activemq::concurrent;
 using namespace activemq::connector;
 using namespace activemq::connector::openwire;
+using namespace activemq::connector::openwire::commands;
 
 ////////////////////////////////////////////////////////////////////////////////
 OpenWireFormatNegotiator::OpenWireFormatNegotiator( OpenWireFormat* openWireFormat,
@@ -52,8 +56,18 @@
     try{
 
         if( closed || next == NULL ){
-            throw CommandIOException( __FILE__, __LINE__,
-                "transport already closed" );
+            throw CommandIOException(
+                __FILE__, __LINE__,
+                "OpenWireFormatNegotiator::oneway - transport already closed" );
+        }
+
+        if( !readyCountDownLatch.await( negotiationTimeout ) ) {
+            throw CommandIOException(
+                __FILE__,
+                __LINE__,
+                "OpenWireFormatNegotiator::oneway"
+                "Wire format negociation timeout: peer did not "
+                "send his wire format." );
         }
 
         next->oneway( command );
@@ -65,9 +79,49 @@
 }
 
 ////////////////////////////////////////////////////////////////////////////////
-void OpenWireFormatNegotiator::onCommand( Command* command ){
+void OpenWireFormatNegotiator::onCommand( Command* command ) {
+
+    DataStructure* dataStructure =
+        dynamic_cast<DataStructure*>( command );
+
+    if( dataStructure != NULL &&
+        dataStructure->getDataStructureType() == WireFormatInfo::ID_WIREFORMATINFO ) {
+
+        WireFormatInfo* info = dynamic_cast<WireFormatInfo*>( dataStructure );
+
+        try {
+
+            if( !info->isValid() ) {
+                throw CommandIOException(
+                    __FILE__,
+                    __LINE__,
+                    "OpenWireFormatNegotiator::onCommand"
+                    "Remote wire format magic is invalid" );
+            }
+
+            wireInfoSentDownLatch.await( negotiationTimeout );
+            openWireFormat->renegotiateWireFormat( info );
 
+            readyCountDownLatch.countDown();
 
+        } catch( exceptions::ActiveMQException& ex ) {
+
+            readyCountDownLatch.countDown();
+            fire( ex );
+        }
+    }
+
+    // Send along to the next interested party.
+    fire( command );
+}
+
+////////////////////////////////////////////////////////////////////////////////
+void OpenWireFormatNegotiator::onTransportException(
+    Transport* source,
+    const exceptions::ActiveMQException& ex ) {
+
+    readyCountDownLatch.countDown();
+    fire( ex );
 }
 
 ////////////////////////////////////////////////////////////////////////////////
@@ -106,11 +160,15 @@
             firstTime = false;
 
             // We first send the WireFormat that we'd prefer.
-            //next.Oneway( wireFormat->getPreferedWireFormatInfo() );
+            next->oneway( openWireFormat->getPreferedWireFormatInfo() );
+
+            // Mark the latch
+            wireInfoSentDownLatch.countDown();
 
         } catch( ActiveMQException& ex ) {
 
-            //wireInfoSentDownLatch.countDown();
+            // Mark the latch
+            wireInfoSentDownLatch.countDown();
             ex.setMark( __FILE__, __LINE__ );
             throw ex;
         }

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireFormatNegotiator.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireFormatNegotiator.h?view=diff&rev=505718&r1=505717&r2=505718
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireFormatNegotiator.h
(original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireFormatNegotiator.h
Sat Feb 10 08:37:29 2007
@@ -44,7 +44,7 @@
         bool firstTime;
 
         /**
-         * Latch to count down till we receive the wireFormat info
+         * Latch objects to count down till we receive the wireFormat info
          */
         concurrent::CountDownLatch wireInfoSentDownLatch;
         concurrent::CountDownLatch readyCountDownLatch;
@@ -94,6 +94,15 @@
          * @param command the received from the nested transport.
          */
         virtual void onCommand( transport::Command* command );
+
+        /**
+         * Event handler for an exception from a command transport.
+         * @param source The source of the exception
+         * @param ex The exception.
+         */
+        virtual void onTransportException(
+            transport::Transport* source,
+            const exceptions::ActiveMQException& ex );
 
         /**
          * Starts this transport object and creates the thread for

Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/commands/WireFormatInfo.h
URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/commands/WireFormatInfo.h?view=diff&rev=505718&r1=505717&r2=505718
==============================================================================
--- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/commands/WireFormatInfo.h
(original)
+++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/commands/WireFormatInfo.h
Sat Feb 10 08:37:29 2007
@@ -18,7 +18,8 @@
 #ifndef _ACTIVEMQ_CONNECTOR_OPENWIRE_COMMANDS_WIREFORMATINFO_H_
 #define _ACTIVEMQ_CONNECTOR_OPENWIRE_COMMANDS_WIREFORMATINFO_H_
 
-#include <activemq/connector/openwire/commands/BaseDataStructure.h>
+#include <activemq/connector/openwire/commands/BaseCommand.h>
+#include <activemq/transport/Command.h>
 
 #include <vector>
 
@@ -27,7 +28,7 @@
 namespace openwire{
 namespace commands{
 
-    class WireFormatInfo : public BaseDataStructure
+    class WireFormatInfo : public BaseCommand<transport::Command>
     {
     public:
 



Mime
View raw message