activemq-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Lalit Nagpal <lalitte...@rediffmail.com>
Subject Re: activemq cms stomp - creating a topic from message->getCMSReplyTo() does not work
Date Fri, 02 Feb 2007 09:54:31 GMT

Hey Nate-

It works .... great .... it works bcoz now message->getCMSReply() method
accepts a ::cms::Destination ... that makes it easier now .... 

This change and the introduction of setter/getter methods in the
::cms::Message class for different data types like setInt() setFloat()
setLong() etc are cool changes .... they help me do things in a easier way
now .... :)

although I have to recode some parts due to these changes :(

Thank you for all the support .... kindly close the bug

Lalit Nagpal
CSA, SunGard


Lalit Nagpal wrote:
> 
> 
> Hi Nathan-
> 
> Requesting to reopen the issue given at
> https://issues.apache.org/activemq/browse/AMQCPP-64
> 
> I had already tried the solution you have provided ... it throws a
> StompException ....
> It is some kind of a inconsistency between how activemq uses stomp ...
> 
>  http://www.nabble.com/file/6142/StompExceptionWithToStringMethod.jpg 
> 
> I appreciate all the help ... 
> 
> Thank you
> 
> Lalit Nagpal
> CSA, SunGard
> 
> 
> 
> Lalit Nagpal wrote:
>> 
>> created JIRA issue at
>> https://issues.apache.org/activemq/browse/AMQCPP-64
>> 
>> attched = source code to reproduce and the image file for output.
>> 
>> Any rough timelines when we can expect a solution for this.
>> 
>> Thanks
>> 
>> Lalit Nagpal
>> CSA, SunGard
>> 
>> 
>> 
>> nmittler wrote:
>>> 
>>> Yep, something is definitely not right.  Please create a JIRA issue here
>>> http://issues.apache.org/activemq/browse/AMQCPP
>>> 
>>> ... be sure to attach the code and your output image.
>>> 
>>> Thanks!
>>> Nate
>>> 
>>> On 1/30/07, Lalit Nagpal <lalittechy@rediffmail.com> wrote:
>>>>
>>>>
>>>> Copy pasting the entire sample main code to reproduce my issue
>>>>
>>>> #include "stdafx.h"
>>>> #include <activemq/concurrent/Thread.h>
>>>> #include <activemq/concurrent/Runnable.h>
>>>> #include <activemq/core/ActiveMQConnectionFactory.h>
>>>> #include <cms/Connection.h>
>>>> #include <cms/Session.h>
>>>> #include <cms/TextMessage.h>
>>>> #include <cms/ExceptionListener.h>
>>>> #include <cms/MessageListener.h>
>>>> #include <stdlib.h>
>>>> #include <stdio.h>
>>>>
>>>> using namespace activemq::core;
>>>> using namespace activemq::concurrent;
>>>> using namespace cms;
>>>> using namespace std;
>>>>
>>>> bool GenerateGuid(char* buf, size_t strsz)
>>>> {
>>>>         GUID a_guid;
>>>>
>>>>         UuidCreate(&a_guid);
>>>>
>>>>         if (strsz > 38)
>>>>         {
>>>>                 sprintf(buf,
>>>>
>>>>                                
>>>> "{%08X-%04X-%04X-%02X%02X-%02X%02X%02X%02X%02X%02X}",
>>>>                                 a_guid.Data1, a_guid.Data2,
>>>> a_guid.Data3,
>>>>                                 a_guid.Data4[0], a_guid.Data4[1],
>>>>                                 a_guid.Data4[2], a_guid.Data4[3],
>>>>                                 a_guid.Data4[4], a_guid.Data4[5],
>>>>                                 a_guid.Data4[6], a_guid.Data4[7]
>>>>                                 );
>>>>                 return true;
>>>>         }
>>>>         return false;
>>>> };
>>>>
>>>> class HelloWorldProducer : public Runnable, public MessageListener {
>>>> private:
>>>>
>>>>         Connection* connection;
>>>>         Session* session;
>>>>         Destination* destination;
>>>>         MessageProducer* producer;
>>>>         MessageConsumer* consumer;
>>>>         int numMessages;
>>>>
>>>> public:
>>>>
>>>>         HelloWorldProducer( int numMessages ){
>>>>                 connection = NULL;
>>>>         session = NULL;
>>>>         destination = NULL;
>>>>         producer = NULL;
>>>>         this->numMessages = numMessages;
>>>>         }
>>>>
>>>>     virtual void onMessage( const Message* message ){
>>>>
>>>>         try
>>>>         {
>>>>             const TextMessage* textMessage = dynamic_cast< const
>>>> TextMessage*
>>>> >( message );
>>>>             string text = textMessage->getText();
>>>>             printf( "Producer Received: %s\n", text.c_str() );
>>>>         } catch (CMSException& e) {
>>>>             e.printStackTrace();
>>>>         }
>>>>     }
>>>>
>>>>         virtual ~HelloWorldProducer(){
>>>>                 cleanup();
>>>>         }
>>>>
>>>>     virtual void run() {
>>>>         try {
>>>>             // Create a ConnectionFactory
>>>>             ActiveMQConnectionFactory* connectionFactory = new
>>>> ActiveMQConnectionFactory("tcp://127.0.0.1:61617");
>>>>
>>>>             // Create a Connection
>>>>             connection = connectionFactory->createConnection();
>>>>             connection->start();
>>>>
>>>>             // Create a Session
>>>>             session = connection->createSession(
>>>> Session::AUTO_ACKNOWLEDGE
>>>> );
>>>>
>>>>             // Create the destination (Topic or Queue)
>>>>             destination = session->createTopic( "TEST.FOO" );
>>>>
>>>>             // Create a MessageProducer from the Session to the Topic
>>>> or
>>>> Queue
>>>>                         cout << endl << "Producer has been registered
>>>> at "
>>>> <<
>>>> destination->toString() << endl << endl;
>>>>             producer = session->createProducer( destination );
>>>>             producer->setDeliveryMode( DeliveryMode::NON_PERSISTANT );
>>>>
>>>>                         // Lets have a reply-back channel to this
>>>> producer
>>>> now
>>>>                         char* charGuid = new char[40];
>>>>                         GenerateGuid(charGuid, 39);
>>>>                         cms::Topic* replyTopic =
>>>> session->createTopic(charGuid);
>>>>                         consumer = session->createConsumer( replyTopic
>>>> );
>>>>                         consumer->setMessageListener(this);
>>>>                         cout << "Reply back channel has been registered
>>>> at
>>>> " <<
>>>> replyTopic->toString() << endl << endl;
>>>>
>>>>             // Stringify the thread id
>>>>             char threadIdStr[100];
>>>>             _snprintf(threadIdStr, sizeof(threadIdStr), "%d",
>>>> Thread::getId() );
>>>>
>>>>             // Create a messages
>>>>             string text = (string)"Hello world! from thread " +
>>>> threadIdStr;
>>>>
>>>>             for( int ix=0; ix<numMessages; ++ix ){
>>>>                     TextMessage* message = session->createTextMessage(
>>>> text );
>>>>
>>>>                                
>>>> message->setCMSReplyTo(replyTopic->toProviderString());
>>>>
>>>>                 // Tell the producer to send the message
>>>>                     printf( "Producer Sent message from thread %s\n",
>>>> threadIdStr);
>>>>                 producer->send( message );
>>>>
>>>>                 delete message;
>>>>             }
>>>>
>>>>         }catch ( CMSException& e ) {
>>>>             e.printStackTrace();
>>>>         }
>>>>     }
>>>>
>>>> private:
>>>>
>>>>     void cleanup(){
>>>>
>>>>                         // Destroy resources.
>>>>                         try{
>>>>                 if( destination != NULL ) delete destination;
>>>>                         }catch ( CMSException& e ) {}
>>>>                         destination = NULL;
>>>>
>>>>                         try{
>>>>                     if( producer != NULL ) delete producer;
>>>>                         }catch ( CMSException& e ) {}
>>>>                         producer = NULL;
>>>>
>>>>                 // Close open resources.
>>>>                 try{
>>>>                         if( session != NULL ) session->close();
>>>>                         if( connection != NULL ) connection->close();
>>>>                         }catch ( CMSException& e ) {}
>>>>
>>>>                         try{
>>>>                 if( session != NULL ) delete session;
>>>>                         }catch ( CMSException& e ) {}
>>>>                         session = NULL;
>>>>
>>>>             try{
>>>>                 if( connection != NULL ) delete connection;
>>>>                         }catch ( CMSException& e ) {}
>>>>                 connection = NULL;
>>>>     }
>>>> };
>>>>
>>>> class HelloWorldConsumer : public ExceptionListener,
>>>>                            public MessageListener,
>>>>                            public Runnable {
>>>>
>>>> private:
>>>>
>>>>         Connection* connection;
>>>>         Session* session;
>>>>         Destination* destination;
>>>>         MessageConsumer* consumer;
>>>>         long waitMillis;
>>>>
>>>> public:
>>>>
>>>>         HelloWorldConsumer( long waitMillis ){
>>>>                 connection = NULL;
>>>>         session = NULL;
>>>>         destination = NULL;
>>>>         consumer = NULL;
>>>>         this->waitMillis = waitMillis;
>>>>         }
>>>>     virtual ~HelloWorldConsumer(){
>>>>         cleanup();
>>>>     }
>>>>
>>>>     virtual void run() {
>>>>
>>>>         try {
>>>>
>>>>             // Create a ConnectionFactory
>>>>             ActiveMQConnectionFactory* connectionFactory =
>>>>                 new ActiveMQConnectionFactory( "tcp://127.0.0.1:61617"
>>>> );
>>>>
>>>>             // Create a Connection
>>>>             connection = connectionFactory->createConnection();
>>>>             delete connectionFactory;
>>>>             connection->start();
>>>>
>>>>             connection->setExceptionListener(this);
>>>>
>>>>             // Create a Session
>>>>             session = connection->createSession(
>>>> Session::AUTO_ACKNOWLEDGE
>>>> );
>>>>
>>>>             // Create the destination (Topic or Queue)
>>>>             destination = session->createTopic( "TEST.FOO" );
>>>>
>>>>             // Create a MessageConsumer from the Session to the Topic
>>>> or
>>>> Queue
>>>>             consumer = session->createConsumer( destination );
>>>>
>>>>             consumer->setMessageListener( this );
>>>>
>>>>             // Sleep while asynchronous messages come in.
>>>>             Thread::sleep( waitMillis );
>>>>
>>>>         } catch (CMSException& e) {
>>>>             e.printStackTrace();
>>>>         }
>>>>     }
>>>>
>>>>     virtual void onMessage( const Message* message ){
>>>>
>>>>         try
>>>>         {
>>>>                         // display the message received
>>>>             const TextMessage* textMessage = dynamic_cast< const
>>>> TextMessage*
>>>> >( message );
>>>>             string text = textMessage->getText();
>>>>             printf( "Consumer Received: %s\n", text.c_str() );
>>>>
>>>>                         // lets reply back with a thanks
>>>>                         if (textMessage->getCMSReplyTo().c_str() &&
>>>> strcmp("null",
>>>> textMessage->getCMSReplyTo().c_str())==1 ) {
>>>>                                 cms::Topic* destination_ =
>>>> session->createTopic(
>>>> textMessage->getCMSReplyTo().c_str() );
>>>>                                 MessageProducer* producer =
>>>> session->createProducer( destination_ );
>>>>                                 producer->setDeliveryMode(
>>>> DeliveryMode::NON_PERSISTANT );
>>>>                                 cout << endl << "Consumer Replying
back
>>>> to
>>>> " << destination_->toString()
>>>> << endl;
>>>>                                 TextMessage* message_ =
>>>> session->createTextMessage( "Thank you for Hello
>>>> World !!!" );
>>>>                                 producer->send(message_);
>>>>                         }
>>>>
>>>>         } catch (CMSException& e) {
>>>>             e.printStackTrace();
>>>>         }
>>>>     }
>>>>
>>>>     virtual void onException( const CMSException& ex ) {
>>>>         printf("JMS Exception occured.  Shutting down client.\n");
>>>>     }
>>>>
>>>> private:
>>>>
>>>>     void cleanup(){
>>>>
>>>>                 // Destroy resources.
>>>>                 try{
>>>>                 if( destination != NULL ) delete destination;
>>>>                 }catch (CMSException& e) {}
>>>>                 destination = NULL;
>>>>
>>>>                 try{
>>>>             if( consumer != NULL ) delete consumer;
>>>>                 }catch (CMSException& e) {}
>>>>                 consumer = NULL;
>>>>
>>>>                 // Close open resources.
>>>>                 try{
>>>>                         if( session != NULL ) session->close();
>>>>                         if( connection != NULL ) connection->close();
>>>>                 }catch (CMSException& e) {}
>>>>
>>>>         try{
>>>>                 if( session != NULL ) delete session;
>>>>                 }catch (CMSException& e) {}
>>>>                 session = NULL;
>>>>
>>>>                 try{
>>>>                 if( connection != NULL ) delete connection;
>>>>                 }catch (CMSException& e) {}
>>>>                 connection = NULL;
>>>>     }
>>>> };
>>>>
>>>> int _tmain(int argc, _TCHAR* argv[])
>>>> {
>>>>     HelloWorldProducer producer( 5 );
>>>>         HelloWorldConsumer consumer( 1500 );
>>>>
>>>>         // Start the consumer thread.
>>>>         Thread consumerThread( &consumer );
>>>>         consumerThread.start();
>>>>
>>>>         // Start the producer thread.
>>>>         Thread producerThread( &producer );
>>>>         producerThread.start();
>>>>
>>>>         // Wait for the threads to complete.
>>>>         producerThread.join();
>>>>         consumerThread.join();
>>>>
>>>>         cout << endl << endl << endl;
>>>>
>>>>         return 0;
>>>> }
>>>>
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> Lalit Nagpal wrote:
>>>> >
>>>> > Hi
>>>> >
>>>> > I am using the activemq-cpp cms api 1.0 release. The problem I am
>>>> facing
>>>> > is like this-
>>>> > My producer sends a message to the consumer and a message should be
>>>> sent
>>>> > from the receiving end as a reply after this - consider a situation
>>>> where
>>>> > a loginRequest message has been sent and now a loginReply message
>>>> should
>>>> > be sent from the receiving end.
>>>> >
>>>> > Attached is a sample main that can reproduce the problem I am facing
>>>> - I
>>>> > have modified the sample helloproducer helloconsumer code available
>>>> at
>>>> > http://activemq.org/site/activemq-cpp-client.html
>>>> > to reproduce my problem so that its easier for you to see.
>>>> >
>>>> >  http://www.nabble.com/file/6111/DestProbs.cpp DestProbs.cpp
>>>> >
>>>> > If you execute this piece of code you will see the output as in the
>>>> > attached image file
>>>> >
>>>> >  http://www.nabble.com/file/6110/DestinationProblem.JPG
>>>> >
>>>> > This is what the code does
>>>> > producer lets call it xxx sends a "Hello world! from thread xxxx" to
>>>> the
>>>> > consumer
>>>> > consumer lets call it yyy receives the message and displays it
>>>> > this is the normal behavior as given in the example on
>>>> > http://activemq.org/site/activemq-cpp-client.html
>>>> > Following extra needs to be done now
>>>> > from yyy a reply should go back to xxx ... for this i registered a
>>>> > producer at yyy by creating a topic using the
>>>> message->getCMSReplyTo()
>>>> and
>>>> > then replying back to that destination.
>>>> >
>>>> > The mismatch can be easily see by doing a bstat .... when I created
>>>> the
>>>> > consumer yyy initially it created a topic by name say ABCDEFGH (which
>>>> is
>>>> a
>>>> > random id) and later on when I used the message->getCMSReplyTo()
to
>>>> create
>>>> > a topic the topic was registered with the name /topic/ABCDEFGH .....
>>>> the
>>>> > additional /topic/ that has got added is doing a mess up here and the
>>>> > replies from yyy to xxx are not reaching xxx (getting enqueued and
>>>> not
>>>> > dequeued) ...
>>>> >
>>>> > the /topic/ gets added due to the following statement in
>>>> > HelloWorldProducer - run method
>>>> > message->setCMSReplyTo(replyTopic->toProviderString());
>>>> > here the toProviderString method adds it actually ... if you replace
>>>> this
>>>> > method with just the toString() method ... you will get a stomp
>>>> exception
>>>> > saying that destinations should start with either /topic/ or /queue/
>>>> >
>>>> > Can somebody make this code work please.
>>>> >
>>>> > For every message sent by the producer to consumer "Hello world! from
>>>> > thread xxxx" there should be a reply coming back as "Thank you for
>>>> Hello
>>>> > World !!!"
>>>> >
>>>> > Please help me urgently here.
>>>> >
>>>> > Thank you in advance
>>>> >
>>>> > Lalit Nagpal
>>>> > CSA, SunGard
>>>> >
>>>> >
>>>>
>>>> --
>>>> View this message in context:
>>>> http://www.nabble.com/activemq-cms-stomp---creating-a-topic-from-message-%3EgetCMSReplyTo%28%29-does-not-work-tf3142309.html#a8709155
>>>> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
>>>>
>>>>
>>> 
>>> 
>> 
>> 
> 
> 

-- 
View this message in context: http://www.nabble.com/activemq-cms-stomp---creating-a-topic-from-message-%3EgetCMSReplyTo%28%29-does-not-work-tf3142309.html#a8764227
Sent from the ActiveMQ - User mailing list archive at Nabble.com.


Mime
View raw message