activemq-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Nathan Mittler" <nathan.mitt...@gmail.com>
Subject Re: activemq cms stomp - creating a topic from message->getCMSReplyTo() does not work
Date Fri, 02 Feb 2007 11:16:08 GMT
Great! will do!

On 2/2/07, Lalit Nagpal <lalittechy@rediffmail.com> wrote:
>
>
> Hey Nate-
>
> It works .... great .... it works bcoz now message->getCMSReply() method
> accepts a ::cms::Destination ... that makes it easier now ....
>
> This change and the introduction of setter/getter methods in the
> ::cms::Message class for different data types like setInt() setFloat()
> setLong() etc are cool changes .... they help me do things in a easier way
> now .... :)
>
> although I have to recode some parts due to these changes :(
>
> Thank you for all the support .... kindly close the bug
>
> Lalit Nagpal
> CSA, SunGard
>
>
> Lalit Nagpal wrote:
> >
> >
> > Hi Nathan-
> >
> > Requesting to reopen the issue given at
> > https://issues.apache.org/activemq/browse/AMQCPP-64
> >
> > I had already tried the solution you have provided ... it throws a
> > StompException ....
> > It is some kind of a inconsistency between how activemq uses stomp ...
> >
> >  http://www.nabble.com/file/6142/StompExceptionWithToStringMethod.jpg
> >
> > I appreciate all the help ...
> >
> > Thank you
> >
> > Lalit Nagpal
> > CSA, SunGard
> >
> >
> >
> > Lalit Nagpal wrote:
> >>
> >> created JIRA issue at
> >> https://issues.apache.org/activemq/browse/AMQCPP-64
> >>
> >> attched = source code to reproduce and the image file for output.
> >>
> >> Any rough timelines when we can expect a solution for this.
> >>
> >> Thanks
> >>
> >> Lalit Nagpal
> >> CSA, SunGard
> >>
> >>
> >>
> >> nmittler wrote:
> >>>
> >>> Yep, something is definitely not right.  Please create a JIRA issue
> here
> >>> http://issues.apache.org/activemq/browse/AMQCPP
> >>>
> >>> ... be sure to attach the code and your output image.
> >>>
> >>> Thanks!
> >>> Nate
> >>>
> >>> On 1/30/07, Lalit Nagpal <lalittechy@rediffmail.com> wrote:
> >>>>
> >>>>
> >>>> Copy pasting the entire sample main code to reproduce my issue
> >>>>
> >>>> #include "stdafx.h"
> >>>> #include <activemq/concurrent/Thread.h>
> >>>> #include <activemq/concurrent/Runnable.h>
> >>>> #include <activemq/core/ActiveMQConnectionFactory.h>
> >>>> #include <cms/Connection.h>
> >>>> #include <cms/Session.h>
> >>>> #include <cms/TextMessage.h>
> >>>> #include <cms/ExceptionListener.h>
> >>>> #include <cms/MessageListener.h>
> >>>> #include <stdlib.h>
> >>>> #include <stdio.h>
> >>>>
> >>>> using namespace activemq::core;
> >>>> using namespace activemq::concurrent;
> >>>> using namespace cms;
> >>>> using namespace std;
> >>>>
> >>>> bool GenerateGuid(char* buf, size_t strsz)
> >>>> {
> >>>>         GUID a_guid;
> >>>>
> >>>>         UuidCreate(&a_guid);
> >>>>
> >>>>         if (strsz > 38)
> >>>>         {
> >>>>                 sprintf(buf,
> >>>>
> >>>>
> >>>> "{%08X-%04X-%04X-%02X%02X-%02X%02X%02X%02X%02X%02X}",
> >>>>                                 a_guid.Data1, a_guid.Data2,
> >>>> a_guid.Data3,
> >>>>                                 a_guid.Data4[0], a_guid.Data4[1],
> >>>>                                 a_guid.Data4[2], a_guid.Data4[3],
> >>>>                                 a_guid.Data4[4], a_guid.Data4[5],
> >>>>                                 a_guid.Data4[6], a_guid.Data4[7]
> >>>>                                 );
> >>>>                 return true;
> >>>>         }
> >>>>         return false;
> >>>> };
> >>>>
> >>>> class HelloWorldProducer : public Runnable, public MessageListener {
> >>>> private:
> >>>>
> >>>>         Connection* connection;
> >>>>         Session* session;
> >>>>         Destination* destination;
> >>>>         MessageProducer* producer;
> >>>>         MessageConsumer* consumer;
> >>>>         int numMessages;
> >>>>
> >>>> public:
> >>>>
> >>>>         HelloWorldProducer( int numMessages ){
> >>>>                 connection = NULL;
> >>>>         session = NULL;
> >>>>         destination = NULL;
> >>>>         producer = NULL;
> >>>>         this->numMessages = numMessages;
> >>>>         }
> >>>>
> >>>>     virtual void onMessage( const Message* message ){
> >>>>
> >>>>         try
> >>>>         {
> >>>>             const TextMessage* textMessage = dynamic_cast< const
> >>>> TextMessage*
> >>>> >( message );
> >>>>             string text = textMessage->getText();
> >>>>             printf( "Producer Received: %s\n", text.c_str() );
> >>>>         } catch (CMSException& e) {
> >>>>             e.printStackTrace();
> >>>>         }
> >>>>     }
> >>>>
> >>>>         virtual ~HelloWorldProducer(){
> >>>>                 cleanup();
> >>>>         }
> >>>>
> >>>>     virtual void run() {
> >>>>         try {
> >>>>             // Create a ConnectionFactory
> >>>>             ActiveMQConnectionFactory* connectionFactory = new
> >>>> ActiveMQConnectionFactory("tcp://127.0.0.1:61617");
> >>>>
> >>>>             // Create a Connection
> >>>>             connection = connectionFactory->createConnection();
> >>>>             connection->start();
> >>>>
> >>>>             // Create a Session
> >>>>             session = connection->createSession(
> >>>> Session::AUTO_ACKNOWLEDGE
> >>>> );
> >>>>
> >>>>             // Create the destination (Topic or Queue)
> >>>>             destination = session->createTopic( "TEST.FOO" );
> >>>>
> >>>>             // Create a MessageProducer from the Session to the Topic
> >>>> or
> >>>> Queue
> >>>>                         cout << endl << "Producer has been
registered
> >>>> at "
> >>>> <<
> >>>> destination->toString() << endl << endl;
> >>>>             producer = session->createProducer( destination );
> >>>>             producer->setDeliveryMode( DeliveryMode::NON_PERSISTANT
> );
> >>>>
> >>>>                         // Lets have a reply-back channel to this
> >>>> producer
> >>>> now
> >>>>                         char* charGuid = new char[40];
> >>>>                         GenerateGuid(charGuid, 39);
> >>>>                         cms::Topic* replyTopic =
> >>>> session->createTopic(charGuid);
> >>>>                         consumer = session->createConsumer(
> replyTopic
> >>>> );
> >>>>                         consumer->setMessageListener(this);
> >>>>                         cout << "Reply back channel has been
> registered
> >>>> at
> >>>> " <<
> >>>> replyTopic->toString() << endl << endl;
> >>>>
> >>>>             // Stringify the thread id
> >>>>             char threadIdStr[100];
> >>>>             _snprintf(threadIdStr, sizeof(threadIdStr), "%d",
> >>>> Thread::getId() );
> >>>>
> >>>>             // Create a messages
> >>>>             string text = (string)"Hello world! from thread " +
> >>>> threadIdStr;
> >>>>
> >>>>             for( int ix=0; ix<numMessages; ++ix ){
> >>>>                     TextMessage* message =
> session->createTextMessage(
> >>>> text );
> >>>>
> >>>>
> >>>> message->setCMSReplyTo(replyTopic->toProviderString());
> >>>>
> >>>>                 // Tell the producer to send the message
> >>>>                     printf( "Producer Sent message from thread %s\n",
> >>>> threadIdStr);
> >>>>                 producer->send( message );
> >>>>
> >>>>                 delete message;
> >>>>             }
> >>>>
> >>>>         }catch ( CMSException& e ) {
> >>>>             e.printStackTrace();
> >>>>         }
> >>>>     }
> >>>>
> >>>> private:
> >>>>
> >>>>     void cleanup(){
> >>>>
> >>>>                         // Destroy resources.
> >>>>                         try{
> >>>>                 if( destination != NULL ) delete destination;
> >>>>                         }catch ( CMSException& e ) {}
> >>>>                         destination = NULL;
> >>>>
> >>>>                         try{
> >>>>                     if( producer != NULL ) delete producer;
> >>>>                         }catch ( CMSException& e ) {}
> >>>>                         producer = NULL;
> >>>>
> >>>>                 // Close open resources.
> >>>>                 try{
> >>>>                         if( session != NULL ) session->close();
> >>>>                         if( connection != NULL ) connection->close();
> >>>>                         }catch ( CMSException& e ) {}
> >>>>
> >>>>                         try{
> >>>>                 if( session != NULL ) delete session;
> >>>>                         }catch ( CMSException& e ) {}
> >>>>                         session = NULL;
> >>>>
> >>>>             try{
> >>>>                 if( connection != NULL ) delete connection;
> >>>>                         }catch ( CMSException& e ) {}
> >>>>                 connection = NULL;
> >>>>     }
> >>>> };
> >>>>
> >>>> class HelloWorldConsumer : public ExceptionListener,
> >>>>                            public MessageListener,
> >>>>                            public Runnable {
> >>>>
> >>>> private:
> >>>>
> >>>>         Connection* connection;
> >>>>         Session* session;
> >>>>         Destination* destination;
> >>>>         MessageConsumer* consumer;
> >>>>         long waitMillis;
> >>>>
> >>>> public:
> >>>>
> >>>>         HelloWorldConsumer( long waitMillis ){
> >>>>                 connection = NULL;
> >>>>         session = NULL;
> >>>>         destination = NULL;
> >>>>         consumer = NULL;
> >>>>         this->waitMillis = waitMillis;
> >>>>         }
> >>>>     virtual ~HelloWorldConsumer(){
> >>>>         cleanup();
> >>>>     }
> >>>>
> >>>>     virtual void run() {
> >>>>
> >>>>         try {
> >>>>
> >>>>             // Create a ConnectionFactory
> >>>>             ActiveMQConnectionFactory* connectionFactory =
> >>>>                 new ActiveMQConnectionFactory(
> "tcp://127.0.0.1:61617"
> >>>> );
> >>>>
> >>>>             // Create a Connection
> >>>>             connection = connectionFactory->createConnection();
> >>>>             delete connectionFactory;
> >>>>             connection->start();
> >>>>
> >>>>             connection->setExceptionListener(this);
> >>>>
> >>>>             // Create a Session
> >>>>             session = connection->createSession(
> >>>> Session::AUTO_ACKNOWLEDGE
> >>>> );
> >>>>
> >>>>             // Create the destination (Topic or Queue)
> >>>>             destination = session->createTopic( "TEST.FOO" );
> >>>>
> >>>>             // Create a MessageConsumer from the Session to the Topic
> >>>> or
> >>>> Queue
> >>>>             consumer = session->createConsumer( destination );
> >>>>
> >>>>             consumer->setMessageListener( this );
> >>>>
> >>>>             // Sleep while asynchronous messages come in.
> >>>>             Thread::sleep( waitMillis );
> >>>>
> >>>>         } catch (CMSException& e) {
> >>>>             e.printStackTrace();
> >>>>         }
> >>>>     }
> >>>>
> >>>>     virtual void onMessage( const Message* message ){
> >>>>
> >>>>         try
> >>>>         {
> >>>>                         // display the message received
> >>>>             const TextMessage* textMessage = dynamic_cast< const
> >>>> TextMessage*
> >>>> >( message );
> >>>>             string text = textMessage->getText();
> >>>>             printf( "Consumer Received: %s\n", text.c_str() );
> >>>>
> >>>>                         // lets reply back with a thanks
> >>>>                         if (textMessage->getCMSReplyTo().c_str()
&&
> >>>> strcmp("null",
> >>>> textMessage->getCMSReplyTo().c_str())==1 ) {
> >>>>                                 cms::Topic* destination_ =
> >>>> session->createTopic(
> >>>> textMessage->getCMSReplyTo().c_str() );
> >>>>                                 MessageProducer* producer =
> >>>> session->createProducer( destination_ );
> >>>>                                 producer->setDeliveryMode(
> >>>> DeliveryMode::NON_PERSISTANT );
> >>>>                                 cout << endl << "Consumer
Replying
> back
> >>>> to
> >>>> " << destination_->toString()
> >>>> << endl;
> >>>>                                 TextMessage* message_ =
> >>>> session->createTextMessage( "Thank you for Hello
> >>>> World !!!" );
> >>>>                                 producer->send(message_);
> >>>>                         }
> >>>>
> >>>>         } catch (CMSException& e) {
> >>>>             e.printStackTrace();
> >>>>         }
> >>>>     }
> >>>>
> >>>>     virtual void onException( const CMSException& ex ) {
> >>>>         printf("JMS Exception occured.  Shutting down client.\n");
> >>>>     }
> >>>>
> >>>> private:
> >>>>
> >>>>     void cleanup(){
> >>>>
> >>>>                 // Destroy resources.
> >>>>                 try{
> >>>>                 if( destination != NULL ) delete destination;
> >>>>                 }catch (CMSException& e) {}
> >>>>                 destination = NULL;
> >>>>
> >>>>                 try{
> >>>>             if( consumer != NULL ) delete consumer;
> >>>>                 }catch (CMSException& e) {}
> >>>>                 consumer = NULL;
> >>>>
> >>>>                 // Close open resources.
> >>>>                 try{
> >>>>                         if( session != NULL ) session->close();
> >>>>                         if( connection != NULL ) connection->close();
> >>>>                 }catch (CMSException& e) {}
> >>>>
> >>>>         try{
> >>>>                 if( session != NULL ) delete session;
> >>>>                 }catch (CMSException& e) {}
> >>>>                 session = NULL;
> >>>>
> >>>>                 try{
> >>>>                 if( connection != NULL ) delete connection;
> >>>>                 }catch (CMSException& e) {}
> >>>>                 connection = NULL;
> >>>>     }
> >>>> };
> >>>>
> >>>> int _tmain(int argc, _TCHAR* argv[])
> >>>> {
> >>>>     HelloWorldProducer producer( 5 );
> >>>>         HelloWorldConsumer consumer( 1500 );
> >>>>
> >>>>         // Start the consumer thread.
> >>>>         Thread consumerThread( &consumer );
> >>>>         consumerThread.start();
> >>>>
> >>>>         // Start the producer thread.
> >>>>         Thread producerThread( &producer );
> >>>>         producerThread.start();
> >>>>
> >>>>         // Wait for the threads to complete.
> >>>>         producerThread.join();
> >>>>         consumerThread.join();
> >>>>
> >>>>         cout << endl << endl << endl;
> >>>>
> >>>>         return 0;
> >>>> }
> >>>>
> >>>>
> >>>>
> >>>>
> >>>>
> >>>>
> >>>> Lalit Nagpal wrote:
> >>>> >
> >>>> > Hi
> >>>> >
> >>>> > I am using the activemq-cpp cms api 1.0 release. The problem I
am
> >>>> facing
> >>>> > is like this-
> >>>> > My producer sends a message to the consumer and a message should
be
> >>>> sent
> >>>> > from the receiving end as a reply after this - consider a situation
> >>>> where
> >>>> > a loginRequest message has been sent and now a loginReply message
> >>>> should
> >>>> > be sent from the receiving end.
> >>>> >
> >>>> > Attached is a sample main that can reproduce the problem I am
> facing
> >>>> - I
> >>>> > have modified the sample helloproducer helloconsumer code available
> >>>> at
> >>>> > http://activemq.org/site/activemq-cpp-client.html
> >>>> > to reproduce my problem so that its easier for you to see.
> >>>> >
> >>>> >  http://www.nabble.com/file/6111/DestProbs.cpp DestProbs.cpp
> >>>> >
> >>>> > If you execute this piece of code you will see the output as in
the
> >>>> > attached image file
> >>>> >
> >>>> >  http://www.nabble.com/file/6110/DestinationProblem.JPG
> >>>> >
> >>>> > This is what the code does
> >>>> > producer lets call it xxx sends a "Hello world! from thread xxxx"
> to
> >>>> the
> >>>> > consumer
> >>>> > consumer lets call it yyy receives the message and displays it
> >>>> > this is the normal behavior as given in the example on
> >>>> > http://activemq.org/site/activemq-cpp-client.html
> >>>> > Following extra needs to be done now
> >>>> > from yyy a reply should go back to xxx ... for this i registered
a
> >>>> > producer at yyy by creating a topic using the
> >>>> message->getCMSReplyTo()
> >>>> and
> >>>> > then replying back to that destination.
> >>>> >
> >>>> > The mismatch can be easily see by doing a bstat .... when I created
> >>>> the
> >>>> > consumer yyy initially it created a topic by name say ABCDEFGH
> (which
> >>>> is
> >>>> a
> >>>> > random id) and later on when I used the message->getCMSReplyTo()
to
> >>>> create
> >>>> > a topic the topic was registered with the name /topic/ABCDEFGH
> .....
> >>>> the
> >>>> > additional /topic/ that has got added is doing a mess up here and
> the
> >>>> > replies from yyy to xxx are not reaching xxx (getting enqueued
and
> >>>> not
> >>>> > dequeued) ...
> >>>> >
> >>>> > the /topic/ gets added due to the following statement in
> >>>> > HelloWorldProducer - run method
> >>>> > message->setCMSReplyTo(replyTopic->toProviderString());
> >>>> > here the toProviderString method adds it actually ... if you
> replace
> >>>> this
> >>>> > method with just the toString() method ... you will get a stomp
> >>>> exception
> >>>> > saying that destinations should start with either /topic/ or
> /queue/
> >>>> >
> >>>> > Can somebody make this code work please.
> >>>> >
> >>>> > For every message sent by the producer to consumer "Hello world!
> from
> >>>> > thread xxxx" there should be a reply coming back as "Thank you
for
> >>>> Hello
> >>>> > World !!!"
> >>>> >
> >>>> > Please help me urgently here.
> >>>> >
> >>>> > Thank you in advance
> >>>> >
> >>>> > Lalit Nagpal
> >>>> > CSA, SunGard
> >>>> >
> >>>> >
> >>>>
> >>>> --
> >>>> View this message in context:
> >>>>
> http://www.nabble.com/activemq-cms-stomp---creating-a-topic-from-message-%3EgetCMSReplyTo%28%29-does-not-work-tf3142309.html#a8709155
> >>>> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
> >>>>
> >>>>
> >>>
> >>>
> >>
> >>
> >
> >
>
> --
> View this message in context:
> http://www.nabble.com/activemq-cms-stomp---creating-a-topic-from-message-%3EgetCMSReplyTo%28%29-does-not-work-tf3142309.html#a8764227
> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message