Return-Path: Delivered-To: apmail-activemq-commits-archive@www.apache.org Received: (qmail 75332 invoked from network); 10 Feb 2007 16:37:51 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 10 Feb 2007 16:37:51 -0000 Received: (qmail 76130 invoked by uid 500); 10 Feb 2007 16:37:58 -0000 Delivered-To: apmail-activemq-commits-archive@activemq.apache.org Received: (qmail 76087 invoked by uid 500); 10 Feb 2007 16:37:58 -0000 Mailing-List: contact commits-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list commits@activemq.apache.org Received: (qmail 76078 invoked by uid 99); 10 Feb 2007 16:37:58 -0000 Received: from herse.apache.org (HELO herse.apache.org) (140.211.11.133) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 10 Feb 2007 08:37:58 -0800 X-ASF-Spam-Status: No, hits=-9.4 required=10.0 tests=ALL_TRUSTED,NO_REAL_NAME X-Spam-Check-By: apache.org Received: from [140.211.11.3] (HELO eris.apache.org) (140.211.11.3) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 10 Feb 2007 08:37:50 -0800 Received: by eris.apache.org (Postfix, from userid 65534) id EB77A1A981A; Sat, 10 Feb 2007 08:37:29 -0800 (PST) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r505718 - in /activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire: OpenWireFormat.h OpenWireFormatNegotiator.cpp OpenWireFormatNegotiator.h commands/WireFormatInfo.h Date: Sat, 10 Feb 2007 16:37:29 -0000 To: commits@activemq.apache.org From: tabish@apache.org X-Mailer: svnmailer-1.1.0 Message-Id: <20070210163729.EB77A1A981A@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Author: tabish Date: Sat Feb 10 08:37:29 2007 New Revision: 505718 URL: http://svn.apache.org/viewvc?view=rev&rev=505718 Log: http://issues.apache.org/activemq/browse/AMQCPP-30 Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireFormat.h activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireFormatNegotiator.cpp activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireFormatNegotiator.h activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/commands/WireFormatInfo.h Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireFormat.h URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireFormat.h?view=diff&rev=505718&r1=505717&r2=505718 ============================================================================== --- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireFormat.h (original) +++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireFormat.h Sat Feb 10 08:37:29 2007 @@ -38,7 +38,7 @@ /** * Constructs a new OpenWireFormat object * @param properties - can contain optional config params. - */ + */ OpenWireFormat( const activemq::util::Properties& properties ); virtual ~OpenWireFormat(); @@ -54,17 +54,17 @@ * Stream based marshaling * @param command - The Command to Marshal * @param out - the output stream to write the command to. - * @throws IOException + * @throws IOException */ - virtual void marshal( transport::Command* command, - io::DataOutputStream* dataOut ) + virtual void marshal( transport::Command* command, + io::DataOutputStream* dataOut ) throw ( io::IOException ); /** - * Packet based un-marshaling + * Packet based un-marshaling * @param dis - the input stream to read the command from. * @returns the newly marshaled Command, caller owns the pointer - * @throws IOException + * @throws IOException */ virtual transport::Command* unmarshal( io::DataInputStream* dis ) throw ( io::IOException ); @@ -75,7 +75,7 @@ * @param bs - the BooleanStream to write to * @returns size of the data returned. */ - virtual int tightMarshalNestedObject1( commands::DataStructure* object, + virtual int tightMarshalNestedObject1( commands::DataStructure* object, utils::BooleanStream* bs ) throw ( io::IOException ); @@ -88,8 +88,8 @@ * @param bs - BooleanStream * @throws IOException if an error occurs. */ - void tightMarshalNestedObject2( commands::DataStructure* o, - io::DataOutputStream* ds, + void tightMarshalNestedObject2( commands::DataStructure* o, + io::DataOutputStream* ds, utils::BooleanStream* bs ) throw ( io::IOException ); /** @@ -101,12 +101,12 @@ * @returns Newly allocated DataStructure Object * @throws IOException if an error occurs. */ - commands::DataStructure* tightUnmarshalNestedObject( io::DataInputStream* dis, - utils::BooleanStream* bs ) + commands::DataStructure* tightUnmarshalNestedObject( io::DataInputStream* dis, + utils::BooleanStream* bs ) throw ( io::IOException ); /** - * Utility method to unmarshal an DataStructure object from an + * Utility method to unmarshal an DataStructure object from an * DataInputStream using the Loose Unmarshalling format. Will read * the Data and construct a new DataStructure based Object, the * pointer to the Object returned is now owned by the caller. @@ -114,20 +114,20 @@ * @returns a new DataStructure derived Object pointer * @throws IOException if an error occurs. */ - commands::DataStructure* looseUnmarshalNestedObject( - io::DataInputStream* dis) + commands::DataStructure* looseUnmarshalNestedObject( + io::DataInputStream* dis) throw ( io::IOException ); /** * Utility method to loosely Marshal an object that is derived from the - * DataStrucutre interface. The marshalled data is written to the + * DataStrucutre interface. The marshalled data is written to the * passed in DataOutputStream. * @param o - DataStructure derived Object to Marshal * @param dataOut - DataOutputStream to write the data to * @throw IOException if an error occurs. */ - void looseMarshalNestedObject( commands::DataStructure* o, - io::DataOutputStream* dataOut ) + void looseMarshalNestedObject( commands::DataStructure* o, + io::DataOutputStream* dataOut ) throw ( io::IOException ); /** @@ -136,7 +136,7 @@ * @param info - The new Wireformat Info settings * @throws IllegalStateException is the params can't be negotiated. */ - void renegotiateWireFormat( commands::WireFormatInfo* info ) + void renegotiateWireFormat( commands::WireFormatInfo* info ) throw ( exceptions::IllegalStateException ); /** @@ -147,6 +147,14 @@ throw ( exceptions::IllegalStateException ); /** + * Gets the Preferend WireFormatInfo object that this class holds + * @return pointer to a prefered WireFormatInfo object + */ + virtual commands::WireFormatInfo* getPreferedWireFormatInfo() const { + return this->preferedWireFormatInfo; + } + + /** * Checks if the stackTraceEnabled flag is on * @return true if the flag is on. */ @@ -246,20 +254,20 @@ /** * Perform the actual unmarshal of data from the given DataInputStream - * return the unmarshalled DataStrucutre object once done, caller takes - * ownership of this object. This method can return null if the type + * return the unmarshalled DataStrucutre object once done, caller takes + * ownership of this object. This method can return null if the type * of the object to unmarshal is NULL, empty data. * @param dis - DataInputStream to read from * @returns new DataStructure* that the caller owns * @throws IOException if an error occurs during the unmarshal */ - commands::DataStructure* doUnmarshal( io::DataInputStream* dis ) + commands::DataStructure* doUnmarshal( io::DataInputStream* dis ) throw ( io::IOException ); protected: - // Declared here to make life easier. - static const unsigned char NULL_TYPE; + // Declared here to make life easier. + static const unsigned char NULL_TYPE; private: Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireFormatNegotiator.cpp URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireFormatNegotiator.cpp?view=diff&rev=505718&r1=505717&r2=505718 ============================================================================== --- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireFormatNegotiator.cpp (original) +++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireFormatNegotiator.cpp Sat Feb 10 08:37:29 2007 @@ -17,12 +17,16 @@ #include "OpenWireFormatNegotiator.h" +#include +#include + using namespace activemq; using namespace activemq::exceptions; using namespace activemq::transport; using namespace activemq::concurrent; using namespace activemq::connector; using namespace activemq::connector::openwire; +using namespace activemq::connector::openwire::commands; //////////////////////////////////////////////////////////////////////////////// OpenWireFormatNegotiator::OpenWireFormatNegotiator( OpenWireFormat* openWireFormat, @@ -52,8 +56,18 @@ try{ if( closed || next == NULL ){ - throw CommandIOException( __FILE__, __LINE__, - "transport already closed" ); + throw CommandIOException( + __FILE__, __LINE__, + "OpenWireFormatNegotiator::oneway - transport already closed" ); + } + + if( !readyCountDownLatch.await( negotiationTimeout ) ) { + throw CommandIOException( + __FILE__, + __LINE__, + "OpenWireFormatNegotiator::oneway" + "Wire format negociation timeout: peer did not " + "send his wire format." ); } next->oneway( command ); @@ -65,9 +79,49 @@ } //////////////////////////////////////////////////////////////////////////////// -void OpenWireFormatNegotiator::onCommand( Command* command ){ +void OpenWireFormatNegotiator::onCommand( Command* command ) { + + DataStructure* dataStructure = + dynamic_cast( command ); + + if( dataStructure != NULL && + dataStructure->getDataStructureType() == WireFormatInfo::ID_WIREFORMATINFO ) { + + WireFormatInfo* info = dynamic_cast( dataStructure ); + + try { + + if( !info->isValid() ) { + throw CommandIOException( + __FILE__, + __LINE__, + "OpenWireFormatNegotiator::onCommand" + "Remote wire format magic is invalid" ); + } + + wireInfoSentDownLatch.await( negotiationTimeout ); + openWireFormat->renegotiateWireFormat( info ); + readyCountDownLatch.countDown(); + } catch( exceptions::ActiveMQException& ex ) { + + readyCountDownLatch.countDown(); + fire( ex ); + } + } + + // Send along to the next interested party. + fire( command ); +} + +//////////////////////////////////////////////////////////////////////////////// +void OpenWireFormatNegotiator::onTransportException( + Transport* source, + const exceptions::ActiveMQException& ex ) { + + readyCountDownLatch.countDown(); + fire( ex ); } //////////////////////////////////////////////////////////////////////////////// @@ -106,11 +160,15 @@ firstTime = false; // We first send the WireFormat that we'd prefer. - //next.Oneway( wireFormat->getPreferedWireFormatInfo() ); + next->oneway( openWireFormat->getPreferedWireFormatInfo() ); + + // Mark the latch + wireInfoSentDownLatch.countDown(); } catch( ActiveMQException& ex ) { - //wireInfoSentDownLatch.countDown(); + // Mark the latch + wireInfoSentDownLatch.countDown(); ex.setMark( __FILE__, __LINE__ ); throw ex; } Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireFormatNegotiator.h URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireFormatNegotiator.h?view=diff&rev=505718&r1=505717&r2=505718 ============================================================================== --- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireFormatNegotiator.h (original) +++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/OpenWireFormatNegotiator.h Sat Feb 10 08:37:29 2007 @@ -44,7 +44,7 @@ bool firstTime; /** - * Latch to count down till we receive the wireFormat info + * Latch objects to count down till we receive the wireFormat info */ concurrent::CountDownLatch wireInfoSentDownLatch; concurrent::CountDownLatch readyCountDownLatch; @@ -94,6 +94,15 @@ * @param command the received from the nested transport. */ virtual void onCommand( transport::Command* command ); + + /** + * Event handler for an exception from a command transport. + * @param source The source of the exception + * @param ex The exception. + */ + virtual void onTransportException( + transport::Transport* source, + const exceptions::ActiveMQException& ex ); /** * Starts this transport object and creates the thread for Modified: activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/commands/WireFormatInfo.h URL: http://svn.apache.org/viewvc/activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/commands/WireFormatInfo.h?view=diff&rev=505718&r1=505717&r2=505718 ============================================================================== --- activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/commands/WireFormatInfo.h (original) +++ activemq/activemq-cpp/trunk/activemq-cpp/src/main/activemq/connector/openwire/commands/WireFormatInfo.h Sat Feb 10 08:37:29 2007 @@ -18,7 +18,8 @@ #ifndef _ACTIVEMQ_CONNECTOR_OPENWIRE_COMMANDS_WIREFORMATINFO_H_ #define _ACTIVEMQ_CONNECTOR_OPENWIRE_COMMANDS_WIREFORMATINFO_H_ -#include +#include +#include #include @@ -27,7 +28,7 @@ namespace openwire{ namespace commands{ - class WireFormatInfo : public BaseDataStructure + class WireFormatInfo : public BaseCommand { public: