activemq-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Timothy Bish <tim.b...@sensis.com>
Subject Re: stomp::commands::ConnectedCommand openwire analog
Date Fri, 15 Jun 2007 14:56:56 GMT
Actually the OpenWireFormatNegotiator wraps around the other Transports,
so you don't need to call that at all.

You really need to dissect the OpenWireConnector to understand the
messages, that class handles all the messaging for Openwire save for the
WireFormat negotiation, which is doe in OpenWireFormatNegotiator

If you look at the code for OpenWireFormatNegotiator you see that in its
start method it sends a WireFormat command to the broker.

////////////////////////////////////////////////////////////////////////////////
void OpenWireFormatNegotiator::start() throw( cms::CMSException ){

    /**
     * We're already started.
     */
    if( !closed ){
        return;
    }

    if( commandlistener == NULL ){
        throw exceptions::ActiveMQException(
            __FILE__, __LINE__,
            "OpenWireFormatNegotiator::start - "
            "commandListener is invalid" );
    }

    if( exceptionListener == NULL ){
        throw exceptions::ActiveMQException(
            __FILE__, __LINE__,
            "OpenWireFormatNegotiator::start - "
            "exceptionListener is invalid" );
    }

    if( next == NULL ){
        throw exceptions::ActiveMQException(
            __FILE__, __LINE__,
            "OpenWireFormatNegotiator::start - "
            "next transport is NULL" );
    }

    if( openWireFormat == NULL ){
        throw exceptions::ActiveMQException(
            __FILE__, __LINE__,
            "OpenWireFormatNegotiator::start - "
            "openWireFormat is NULL" );
    }

    // Start the delegate transport object.
    next->start();

    if( firstTime == true ) {

        try {

            // The First Time is now over with
            firstTime = false;

            // We first send the WireFormat that we'd prefer.
            next->oneway( openWireFormat->getPreferedWireFormatInfo() );

            // Mark the latch
            wireInfoSentDownLatch.countDown();

        } catch( ActiveMQException& ex ) {

            // Mark the latch
            wireInfoSentDownLatch.countDown();
            ex.setMark( __FILE__, __LINE__ );
            throw ex;
        }
    }

    // Mark it as open.
    closed = false;
}

Now in the OpenWireFormatNegotiator's oneway method it won't let any
messages out until the broker has responded with its WireFormat

////////////////////////////////////////////////////////////////////////////////
void OpenWireFormatNegotiator::oneway( Command* command )
    throw( CommandIOException,
exceptions::UnsupportedOperationException ) {

    try{

        if( closed || next == NULL ){
            throw CommandIOException(
                __FILE__, __LINE__,
                "OpenWireFormatNegotiator::oneway - transport already
closed" );
        }

        if( !readyCountDownLatch.await( negotiationTimeout ) ) {
            throw CommandIOException(
                __FILE__,
                __LINE__,
                "OpenWireFormatNegotiator::oneway"
                "Wire format negociation timeout: peer did not "
                "send his wire format." );
        }

        next->oneway( command );
    }
    AMQ_CATCH_RETHROW( exceptions::UnsupportedOperationException )
    AMQ_CATCH_RETHROW( CommandIOException )
    AMQ_CATCH_EXCEPTION_CONVERT( exceptions::ActiveMQException,
CommandIOException )
    AMQ_CATCHALL_THROW( CommandIOException )
}


If you look at the OpenWireFormatNegotiator's onCommand method you see
its response to receiving a WireFormat command.

////////////////////////////////////////////////////////////////////////////////
void OpenWireFormatNegotiator::onCommand( Command* command ) {

    DataStructure* dataStructure =
        dynamic_cast<DataStructure*>( command );

    if( dataStructure != NULL &&
        dataStructure->getDataStructureType() ==
WireFormatInfo::ID_WIREFORMATINFO ) {

        WireFormatInfo* info =
dynamic_cast<WireFormatInfo*>( dataStructure );

        try {

            if( !info->isValid() ) {
                throw CommandIOException(
                    __FILE__,
                    __LINE__,
                    "OpenWireFormatNegotiator::onCommand"
                    "Remote wire format magic is invalid" );
            }

            wireInfoSentDownLatch.await( negotiationTimeout );
            openWireFormat->renegotiateWireFormat( info );

            readyCountDownLatch.countDown();

        } catch( exceptions::ActiveMQException& ex ) {

            readyCountDownLatch.countDown();
            fire( ex );
        }
    }

    // Send along to the next interested party.
    fire( command );
}

So in your response builder you should send only once a WireFormat
command back to the client in response to the initial send of a
WireFormat command.  This could just be a cloned copy of the callers,
which would just allow it to assume its preferred wireformat is
acceptable to the broker.

The idea of the response builder is that its emulating all the commands
to come in from the broker, either synchronously in the case of
responses to requests, or asynchronously as in the case of the
WireFormat negotiation.

On Fri, 2007-06-15 at 07:07 -0700, Motl wrote:
> I have attached my version of OpenWireResponseBuilder and also modified
> version of DummyTransportFactory.
> Could you point to the places in the code where I should put
> OpenwireFormatNegotiator::oneway() call.
> Or may be it should be done in DummyTransport file? Sorry for annoying
> questions, but I have no any idea about dataflow within activemq-cpp.
> 
> Thanks
> 
> 
> tabish121 wrote:
> > 
> > OpenwireFormatNegotiator is a TransprFilter, it wraps whatever transport
> > is passed into the OpenWireConnector.  The very first call to oneway I
> > think it is on this TransportFilter waits for the Broker to send its
> > WireFormatInfo and then in the OnCommand it negotiates for
> > WireFormatInfo details like the version of Openwire that it supports.  
> > 
> > Response doesn't need a setSessionId, look very closely at the
> > OpenWireConenctor::createSession method, we set the sessionId on the
> > outgoing message.  
> > 
> > 
> > On Fri, 2007-06-15 at 05:06 -0700, Motl wrote:
> >> Sorry, I haven't found the place where OpenWireFormatNegotiator::start()
> >> is
> >> called, so I don't understand why it should be added to
> >> OpenWireResponseBuilder. Another question is:
> >> openwire::commands::Response
> >> doesn't have setSessionId method, how should I set SessionId for the
> >> generated response? 
> >> 
> >> Thanks,
> >> Matvey
> >> 
> >> 
> >> tabish121 wrote:
> >> > 
> >> > Take a look at OpenWireFormatNegotiator.cpp there is an initial
> >> exchange
> >> > of wireformat info between broker and client to establish what version
> >> > of openwire etc each supports.
> >> > 
> >> > On Thu, 2007-06-14 at 08:42 -0700, Motl wrote:
> >> >> Sorry, what do you mean by sending WireFormatInfo at the initial
> >> >> connection?
> >> >> Isn't it done by createConnection() by specifying
> >> ""wireFormat=openwire"
> >> >> in
> >> >> brokerURL?
> >> >> 
> >> >> Matvey
> >> >> 
> >> >> 
> >> >> tabish121 wrote:
> >> >> > 
> >> >> > If you look at the code in the OpenwireConnector you can get an
idea
> >> of
> >> >> > what messages require what responses.  In the case of Connect,
there
> >> is
> >> >> > a response that is needed i.e Response command.  You will also
need
> >> to
> >> >> > send a proper WireFormatInfo when the initial connection is made
so
> >> >> that
> >> >> > the OpenwireWireFormatNegotiator will get the response its
> >> expecting.  
> >> >> > 
> >> >> > A good way to see the message traffic is to connect to a broker
with
> >> >> > transport.commandTracingEnabled=true in the URI.  This will cause
> >> the C
> >> >> > ++ client to log the console all the Commands that are exchanged
> >> with
> >> >> > the Broker, you can use this as a guide when implementing the
> >> Response
> >> >> > Builder class.
> >> >> > 
> >> >> > Regards
> >> >> > Tim
> >> >> > 
> >> >> > On Thu, 2007-06-14 at 07:00 -0700, Motl wrote:
> >> >> >> I am writing unit tests for the application that uses activemqcpp
> >> >> >> library.
> >> >> >> Using DummyTransport is nice for that purpose, but it only
supports
> >> >> stomp
> >> >> >> at
> >> >> >> the moment, that misses TemporaryQueue implementation, and
it's
> >> quite
> >> >> >> important functionality for me. 
> >> >> >> Following your advice, I am started implementing
> >> >> OpenWireResponseBuilder.
> >> >> >> I
> >> >> >> don't know the internals of ativemq-cpp library, so it's quite
> >> >> blindfold
> >> >> >> way
> >> >> >> for me ) 
> >> >> >> 
> >> >> >> Finally, the question is: which "openwire class" corresponds
to
> >> "stomp
> >> >> >> class" commands::ConnectedCommand?
> >> >> >> 
> >> >> >> I tried to use openwire::BaseCommand template, but what class
> >> should I
> >> >> >> instantiate it with?
> >> >> >> 
> >> >> >> Thanx a lot
> >> >> > 
> >> >> > 
> >> >> 
> >> > 
> >> > 
> >> 
> > 
> > 
> http://www.nabble.com/file/p11140232/DummyTransport.h DummyTransport.h 
> http://www.nabble.com/file/p11140232/OpenWireResponseBuilder.h
> OpenWireResponseBuilder.h 

Mime
View raw message