activemq-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Lalit Nagpal <lalitte...@rediffmail.com>
Subject Re: activemq cms stomp - creating a topic from message->getCMSReplyTo() does not work
Date Wed, 31 Jan 2007 12:25:31 GMT


created JIRA issue at
https://issues.apache.org/activemq/browse/AMQCPP-64

attched = source code to reproduce and the image file for output.

Thanks

Lalit Nagpal
CSA, SunGard



nmittler wrote:
> 
> Yep, something is definitely not right.  Please create a JIRA issue here
> http://issues.apache.org/activemq/browse/AMQCPP
> 
> ... be sure to attach the code and your output image.
> 
> Thanks!
> Nate
> 
> On 1/30/07, Lalit Nagpal <lalittechy@rediffmail.com> wrote:
>>
>>
>> Copy pasting the entire sample main code to reproduce my issue
>>
>> #include "stdafx.h"
>> #include <activemq/concurrent/Thread.h>
>> #include <activemq/concurrent/Runnable.h>
>> #include <activemq/core/ActiveMQConnectionFactory.h>
>> #include <cms/Connection.h>
>> #include <cms/Session.h>
>> #include <cms/TextMessage.h>
>> #include <cms/ExceptionListener.h>
>> #include <cms/MessageListener.h>
>> #include <stdlib.h>
>> #include <stdio.h>
>>
>> using namespace activemq::core;
>> using namespace activemq::concurrent;
>> using namespace cms;
>> using namespace std;
>>
>> bool GenerateGuid(char* buf, size_t strsz)
>> {
>>         GUID a_guid;
>>
>>         UuidCreate(&a_guid);
>>
>>         if (strsz > 38)
>>         {
>>                 sprintf(buf,
>>
>>                                
>> "{%08X-%04X-%04X-%02X%02X-%02X%02X%02X%02X%02X%02X}",
>>                                 a_guid.Data1, a_guid.Data2, a_guid.Data3,
>>                                 a_guid.Data4[0], a_guid.Data4[1],
>>                                 a_guid.Data4[2], a_guid.Data4[3],
>>                                 a_guid.Data4[4], a_guid.Data4[5],
>>                                 a_guid.Data4[6], a_guid.Data4[7]
>>                                 );
>>                 return true;
>>         }
>>         return false;
>> };
>>
>> class HelloWorldProducer : public Runnable, public MessageListener {
>> private:
>>
>>         Connection* connection;
>>         Session* session;
>>         Destination* destination;
>>         MessageProducer* producer;
>>         MessageConsumer* consumer;
>>         int numMessages;
>>
>> public:
>>
>>         HelloWorldProducer( int numMessages ){
>>                 connection = NULL;
>>         session = NULL;
>>         destination = NULL;
>>         producer = NULL;
>>         this->numMessages = numMessages;
>>         }
>>
>>     virtual void onMessage( const Message* message ){
>>
>>         try
>>         {
>>             const TextMessage* textMessage = dynamic_cast< const
>> TextMessage*
>> >( message );
>>             string text = textMessage->getText();
>>             printf( "Producer Received: %s\n", text.c_str() );
>>         } catch (CMSException& e) {
>>             e.printStackTrace();
>>         }
>>     }
>>
>>         virtual ~HelloWorldProducer(){
>>                 cleanup();
>>         }
>>
>>     virtual void run() {
>>         try {
>>             // Create a ConnectionFactory
>>             ActiveMQConnectionFactory* connectionFactory = new
>> ActiveMQConnectionFactory("tcp://127.0.0.1:61617");
>>
>>             // Create a Connection
>>             connection = connectionFactory->createConnection();
>>             connection->start();
>>
>>             // Create a Session
>>             session = connection->createSession(
>> Session::AUTO_ACKNOWLEDGE
>> );
>>
>>             // Create the destination (Topic or Queue)
>>             destination = session->createTopic( "TEST.FOO" );
>>
>>             // Create a MessageProducer from the Session to the Topic or
>> Queue
>>                         cout << endl << "Producer has been registered
at
>> "
>> <<
>> destination->toString() << endl << endl;
>>             producer = session->createProducer( destination );
>>             producer->setDeliveryMode( DeliveryMode::NON_PERSISTANT );
>>
>>                         // Lets have a reply-back channel to this
>> producer
>> now
>>                         char* charGuid = new char[40];
>>                         GenerateGuid(charGuid, 39);
>>                         cms::Topic* replyTopic =
>> session->createTopic(charGuid);
>>                         consumer = session->createConsumer( replyTopic );
>>                         consumer->setMessageListener(this);
>>                         cout << "Reply back channel has been registered
>> at
>> " <<
>> replyTopic->toString() << endl << endl;
>>
>>             // Stringify the thread id
>>             char threadIdStr[100];
>>             _snprintf(threadIdStr, sizeof(threadIdStr), "%d",
>> Thread::getId() );
>>
>>             // Create a messages
>>             string text = (string)"Hello world! from thread " +
>> threadIdStr;
>>
>>             for( int ix=0; ix<numMessages; ++ix ){
>>                     TextMessage* message = session->createTextMessage(
>> text );
>>
>>                                
>> message->setCMSReplyTo(replyTopic->toProviderString());
>>
>>                 // Tell the producer to send the message
>>                     printf( "Producer Sent message from thread %s\n",
>> threadIdStr);
>>                 producer->send( message );
>>
>>                 delete message;
>>             }
>>
>>         }catch ( CMSException& e ) {
>>             e.printStackTrace();
>>         }
>>     }
>>
>> private:
>>
>>     void cleanup(){
>>
>>                         // Destroy resources.
>>                         try{
>>                 if( destination != NULL ) delete destination;
>>                         }catch ( CMSException& e ) {}
>>                         destination = NULL;
>>
>>                         try{
>>                     if( producer != NULL ) delete producer;
>>                         }catch ( CMSException& e ) {}
>>                         producer = NULL;
>>
>>                 // Close open resources.
>>                 try{
>>                         if( session != NULL ) session->close();
>>                         if( connection != NULL ) connection->close();
>>                         }catch ( CMSException& e ) {}
>>
>>                         try{
>>                 if( session != NULL ) delete session;
>>                         }catch ( CMSException& e ) {}
>>                         session = NULL;
>>
>>             try{
>>                 if( connection != NULL ) delete connection;
>>                         }catch ( CMSException& e ) {}
>>                 connection = NULL;
>>     }
>> };
>>
>> class HelloWorldConsumer : public ExceptionListener,
>>                            public MessageListener,
>>                            public Runnable {
>>
>> private:
>>
>>         Connection* connection;
>>         Session* session;
>>         Destination* destination;
>>         MessageConsumer* consumer;
>>         long waitMillis;
>>
>> public:
>>
>>         HelloWorldConsumer( long waitMillis ){
>>                 connection = NULL;
>>         session = NULL;
>>         destination = NULL;
>>         consumer = NULL;
>>         this->waitMillis = waitMillis;
>>         }
>>     virtual ~HelloWorldConsumer(){
>>         cleanup();
>>     }
>>
>>     virtual void run() {
>>
>>         try {
>>
>>             // Create a ConnectionFactory
>>             ActiveMQConnectionFactory* connectionFactory =
>>                 new ActiveMQConnectionFactory( "tcp://127.0.0.1:61617" );
>>
>>             // Create a Connection
>>             connection = connectionFactory->createConnection();
>>             delete connectionFactory;
>>             connection->start();
>>
>>             connection->setExceptionListener(this);
>>
>>             // Create a Session
>>             session = connection->createSession(
>> Session::AUTO_ACKNOWLEDGE
>> );
>>
>>             // Create the destination (Topic or Queue)
>>             destination = session->createTopic( "TEST.FOO" );
>>
>>             // Create a MessageConsumer from the Session to the Topic or
>> Queue
>>             consumer = session->createConsumer( destination );
>>
>>             consumer->setMessageListener( this );
>>
>>             // Sleep while asynchronous messages come in.
>>             Thread::sleep( waitMillis );
>>
>>         } catch (CMSException& e) {
>>             e.printStackTrace();
>>         }
>>     }
>>
>>     virtual void onMessage( const Message* message ){
>>
>>         try
>>         {
>>                         // display the message received
>>             const TextMessage* textMessage = dynamic_cast< const
>> TextMessage*
>> >( message );
>>             string text = textMessage->getText();
>>             printf( "Consumer Received: %s\n", text.c_str() );
>>
>>                         // lets reply back with a thanks
>>                         if (textMessage->getCMSReplyTo().c_str() &&
>> strcmp("null",
>> textMessage->getCMSReplyTo().c_str())==1 ) {
>>                                 cms::Topic* destination_ =
>> session->createTopic(
>> textMessage->getCMSReplyTo().c_str() );
>>                                 MessageProducer* producer =
>> session->createProducer( destination_ );
>>                                 producer->setDeliveryMode(
>> DeliveryMode::NON_PERSISTANT );
>>                                 cout << endl << "Consumer Replying back
>> to
>> " << destination_->toString()
>> << endl;
>>                                 TextMessage* message_ =
>> session->createTextMessage( "Thank you for Hello
>> World !!!" );
>>                                 producer->send(message_);
>>                         }
>>
>>         } catch (CMSException& e) {
>>             e.printStackTrace();
>>         }
>>     }
>>
>>     virtual void onException( const CMSException& ex ) {
>>         printf("JMS Exception occured.  Shutting down client.\n");
>>     }
>>
>> private:
>>
>>     void cleanup(){
>>
>>                 // Destroy resources.
>>                 try{
>>                 if( destination != NULL ) delete destination;
>>                 }catch (CMSException& e) {}
>>                 destination = NULL;
>>
>>                 try{
>>             if( consumer != NULL ) delete consumer;
>>                 }catch (CMSException& e) {}
>>                 consumer = NULL;
>>
>>                 // Close open resources.
>>                 try{
>>                         if( session != NULL ) session->close();
>>                         if( connection != NULL ) connection->close();
>>                 }catch (CMSException& e) {}
>>
>>         try{
>>                 if( session != NULL ) delete session;
>>                 }catch (CMSException& e) {}
>>                 session = NULL;
>>
>>                 try{
>>                 if( connection != NULL ) delete connection;
>>                 }catch (CMSException& e) {}
>>                 connection = NULL;
>>     }
>> };
>>
>> int _tmain(int argc, _TCHAR* argv[])
>> {
>>     HelloWorldProducer producer( 5 );
>>         HelloWorldConsumer consumer( 1500 );
>>
>>         // Start the consumer thread.
>>         Thread consumerThread( &consumer );
>>         consumerThread.start();
>>
>>         // Start the producer thread.
>>         Thread producerThread( &producer );
>>         producerThread.start();
>>
>>         // Wait for the threads to complete.
>>         producerThread.join();
>>         consumerThread.join();
>>
>>         cout << endl << endl << endl;
>>
>>         return 0;
>> }
>>
>>
>>
>>
>>
>>
>> Lalit Nagpal wrote:
>> >
>> > Hi
>> >
>> > I am using the activemq-cpp cms api 1.0 release. The problem I am
>> facing
>> > is like this-
>> > My producer sends a message to the consumer and a message should be
>> sent
>> > from the receiving end as a reply after this - consider a situation
>> where
>> > a loginRequest message has been sent and now a loginReply message
>> should
>> > be sent from the receiving end.
>> >
>> > Attached is a sample main that can reproduce the problem I am facing -
>> I
>> > have modified the sample helloproducer helloconsumer code available at
>> > http://activemq.org/site/activemq-cpp-client.html
>> > to reproduce my problem so that its easier for you to see.
>> >
>> >  http://www.nabble.com/file/6111/DestProbs.cpp DestProbs.cpp
>> >
>> > If you execute this piece of code you will see the output as in the
>> > attached image file
>> >
>> >  http://www.nabble.com/file/6110/DestinationProblem.JPG
>> >
>> > This is what the code does
>> > producer lets call it xxx sends a "Hello world! from thread xxxx" to
>> the
>> > consumer
>> > consumer lets call it yyy receives the message and displays it
>> > this is the normal behavior as given in the example on
>> > http://activemq.org/site/activemq-cpp-client.html
>> > Following extra needs to be done now
>> > from yyy a reply should go back to xxx ... for this i registered a
>> > producer at yyy by creating a topic using the message->getCMSReplyTo()
>> and
>> > then replying back to that destination.
>> >
>> > The mismatch can be easily see by doing a bstat .... when I created the
>> > consumer yyy initially it created a topic by name say ABCDEFGH (which
>> is
>> a
>> > random id) and later on when I used the message->getCMSReplyTo() to
>> create
>> > a topic the topic was registered with the name /topic/ABCDEFGH .....
>> the
>> > additional /topic/ that has got added is doing a mess up here and the
>> > replies from yyy to xxx are not reaching xxx (getting enqueued and not
>> > dequeued) ...
>> >
>> > the /topic/ gets added due to the following statement in
>> > HelloWorldProducer - run method
>> > message->setCMSReplyTo(replyTopic->toProviderString());
>> > here the toProviderString method adds it actually ... if you replace
>> this
>> > method with just the toString() method ... you will get a stomp
>> exception
>> > saying that destinations should start with either /topic/ or /queue/
>> >
>> > Can somebody make this code work please.
>> >
>> > For every message sent by the producer to consumer "Hello world! from
>> > thread xxxx" there should be a reply coming back as "Thank you for
>> Hello
>> > World !!!"
>> >
>> > Please help me urgently here.
>> >
>> > Thank you in advance
>> >
>> > Lalit Nagpal
>> > CSA, SunGard
>> >
>> >
>>
>> --
>> View this message in context:
>> http://www.nabble.com/activemq-cms-stomp---creating-a-topic-from-message-%3EgetCMSReplyTo%28%29-does-not-work-tf3142309.html#a8709155
>> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
>>
>>
> 
> 

-- 
View this message in context: http://www.nabble.com/activemq-cms-stomp---creating-a-topic-from-message-%3EgetCMSReplyTo%28%29-does-not-work-tf3142309.html#a8728052
Sent from the ActiveMQ - User mailing list archive at Nabble.com.


Mime
View raw message