activemq-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Nathan Mittler" <nathan.mitt...@gmail.com>
Subject Re: activemq cms stomp - creating a topic from message->getCMSReplyTo() does not work
Date Thu, 01 Feb 2007 11:08:23 GMT
Hi Lalit,
I could tell from the code that you were using the 1.0 version of
activemq-cpp.  The new code uses cms::Destination objects on the get/set
CMSReplyTo methods.  Please give the latest trunk a try.

BTW, svn has been moved.  You can get the latest trunk here
https://svn.apache.org/repos/asf/activemq/activemq-cpp/trunk/activemq-cpp/

Regards,
Nate

On 2/1/07, Lalit Nagpal <lalittechy@rediffmail.com> wrote:
>
>
>
> Hi Nathan-
>
> Requesting to reopen the issue given at
> https://issues.apache.org/activemq/browse/AMQCPP-64
>
> I had already tried the solution you have provided ... it throws a
> StompException ....
> It is some kind of a inconsistency between how activemq uses stomp ...
>
> http://www.nabble.com/file/6142/StompExceptionWithToStringMethod.jpg
>
> I appreciate all the help ...
>
> Thank you
>
> Lalit Nagpal
> CSA, SunGard
>
>
>
> Lalit Nagpal wrote:
> >
> > created JIRA issue at
> > https://issues.apache.org/activemq/browse/AMQCPP-64
> >
> > attched = source code to reproduce and the image file for output.
> >
> > Any rough timelines when we can expect a solution for this.
> >
> > Thanks
> >
> > Lalit Nagpal
> > CSA, SunGard
> >
> >
> >
> > nmittler wrote:
> >>
> >> Yep, something is definitely not right.  Please create a JIRA issue
> here
> >> http://issues.apache.org/activemq/browse/AMQCPP
> >>
> >> ... be sure to attach the code and your output image.
> >>
> >> Thanks!
> >> Nate
> >>
> >> On 1/30/07, Lalit Nagpal <lalittechy@rediffmail.com> wrote:
> >>>
> >>>
> >>> Copy pasting the entire sample main code to reproduce my issue
> >>>
> >>> #include "stdafx.h"
> >>> #include <activemq/concurrent/Thread.h>
> >>> #include <activemq/concurrent/Runnable.h>
> >>> #include <activemq/core/ActiveMQConnectionFactory.h>
> >>> #include <cms/Connection.h>
> >>> #include <cms/Session.h>
> >>> #include <cms/TextMessage.h>
> >>> #include <cms/ExceptionListener.h>
> >>> #include <cms/MessageListener.h>
> >>> #include <stdlib.h>
> >>> #include <stdio.h>
> >>>
> >>> using namespace activemq::core;
> >>> using namespace activemq::concurrent;
> >>> using namespace cms;
> >>> using namespace std;
> >>>
> >>> bool GenerateGuid(char* buf, size_t strsz)
> >>> {
> >>>         GUID a_guid;
> >>>
> >>>         UuidCreate(&a_guid);
> >>>
> >>>         if (strsz > 38)
> >>>         {
> >>>                 sprintf(buf,
> >>>
> >>>
> >>> "{%08X-%04X-%04X-%02X%02X-%02X%02X%02X%02X%02X%02X}",
> >>>                                 a_guid.Data1, a_guid.Data2,
> >>> a_guid.Data3,
> >>>                                 a_guid.Data4[0], a_guid.Data4[1],
> >>>                                 a_guid.Data4[2], a_guid.Data4[3],
> >>>                                 a_guid.Data4[4], a_guid.Data4[5],
> >>>                                 a_guid.Data4[6], a_guid.Data4[7]
> >>>                                 );
> >>>                 return true;
> >>>         }
> >>>         return false;
> >>> };
> >>>
> >>> class HelloWorldProducer : public Runnable, public MessageListener {
> >>> private:
> >>>
> >>>         Connection* connection;
> >>>         Session* session;
> >>>         Destination* destination;
> >>>         MessageProducer* producer;
> >>>         MessageConsumer* consumer;
> >>>         int numMessages;
> >>>
> >>> public:
> >>>
> >>>         HelloWorldProducer( int numMessages ){
> >>>                 connection = NULL;
> >>>         session = NULL;
> >>>         destination = NULL;
> >>>         producer = NULL;
> >>>         this->numMessages = numMessages;
> >>>         }
> >>>
> >>>     virtual void onMessage( const Message* message ){
> >>>
> >>>         try
> >>>         {
> >>>             const TextMessage* textMessage = dynamic_cast< const
> >>> TextMessage*
> >>> >( message );
> >>>             string text = textMessage->getText();
> >>>             printf( "Producer Received: %s\n", text.c_str() );
> >>>         } catch (CMSException& e) {
> >>>             e.printStackTrace();
> >>>         }
> >>>     }
> >>>
> >>>         virtual ~HelloWorldProducer(){
> >>>                 cleanup();
> >>>         }
> >>>
> >>>     virtual void run() {
> >>>         try {
> >>>             // Create a ConnectionFactory
> >>>             ActiveMQConnectionFactory* connectionFactory = new
> >>> ActiveMQConnectionFactory("tcp://127.0.0.1:61617");
> >>>
> >>>             // Create a Connection
> >>>             connection = connectionFactory->createConnection();
> >>>             connection->start();
> >>>
> >>>             // Create a Session
> >>>             session = connection->createSession(
> >>> Session::AUTO_ACKNOWLEDGE
> >>> );
> >>>
> >>>             // Create the destination (Topic or Queue)
> >>>             destination = session->createTopic( " TEST.FOO" );
> >>>
> >>>             // Create a MessageProducer from the Session to the Topic
> or
> >>> Queue
> >>>                         cout << endl << "Producer has been registered
> at
> >>> "
> >>> <<
> >>> destination->toString() << endl << endl;
> >>>             producer = session->createProducer( destination );
> >>>             producer->setDeliveryMode( DeliveryMode::NON_PERSISTANT );
>
> >>>
> >>>                         // Lets have a reply-back channel to this
> >>> producer
> >>> now
> >>>                         char* charGuid = new char[40];
> >>>                         GenerateGuid(charGuid, 39);
> >>>                         cms::Topic* replyTopic =
> >>> session->createTopic(charGuid);
> >>>                         consumer = session->createConsumer( replyTopic
> >>> );
> >>>                         consumer->setMessageListener(this);
> >>>                         cout << "Reply back channel has been
> registered
> >>> at
> >>> " <<
> >>> replyTopic->toString() << endl << endl;
> >>>
> >>>             // Stringify the thread id
> >>>             char threadIdStr[100];
> >>>             _snprintf(threadIdStr, sizeof(threadIdStr), "%d",
> >>> Thread::getId() );
> >>>
> >>>             // Create a messages
> >>>             string text = (string)"Hello world! from thread " +
> >>> threadIdStr;
> >>>
> >>>             for( int ix=0; ix<numMessages; ++ix ){
> >>>                     TextMessage* message = session->createTextMessage(
> >>> text );
> >>>
> >>>
> >>> message->setCMSReplyTo(replyTopic->toProviderString());
> >>>
> >>>                 // Tell the producer to send the message
> >>>                     printf( "Producer Sent message from thread %s\n",
> >>> threadIdStr);
> >>>                 producer->send( message );
> >>>
> >>>                 delete message;
> >>>             }
> >>>
> >>>         }catch ( CMSException& e ) {
> >>>             e.printStackTrace();
> >>>         }
> >>>     }
> >>>
> >>> private:
> >>>
> >>>     void cleanup(){
> >>>
> >>>                         // Destroy resources.
> >>>                         try{
> >>>                 if( destination != NULL ) delete destination;
> >>>                         }catch ( CMSException& e ) {}
> >>>                         destination = NULL;
> >>>
> >>>                         try{
> >>>                     if( producer != NULL ) delete producer;
> >>>                         }catch ( CMSException& e ) {}
> >>>                         producer = NULL;
> >>>
> >>>                 // Close open resources.
> >>>                 try{
> >>>                         if( session != NULL ) session->close();
> >>>                         if( connection != NULL ) connection->close();
> >>>                         }catch ( CMSException& e ) {}
> >>>
> >>>                         try{
> >>>                 if( session != NULL ) delete session;
> >>>                         }catch ( CMSException& e ) {}
> >>>                         session = NULL;
> >>>
> >>>             try{
> >>>                 if( connection != NULL ) delete connection;
> >>>                         }catch ( CMSException& e ) {}
> >>>                 connection = NULL;
> >>>     }
> >>> };
> >>>
> >>> class HelloWorldConsumer : public ExceptionListener,
> >>>                            public MessageListener,
> >>>                            public Runnable {
> >>>
> >>> private:
> >>>
> >>>         Connection* connection;
> >>>         Session* session;
> >>>         Destination* destination;
> >>>         MessageConsumer* consumer;
> >>>         long waitMillis;
> >>>
> >>> public:
> >>>
> >>>         HelloWorldConsumer( long waitMillis ){
> >>>                 connection = NULL;
> >>>         session = NULL;
> >>>         destination = NULL;
> >>>         consumer = NULL;
> >>>         this->waitMillis = waitMillis;
> >>>         }
> >>>     virtual ~HelloWorldConsumer(){
> >>>         cleanup();
> >>>     }
> >>>
> >>>     virtual void run() {
> >>>
> >>>         try {
> >>>
> >>>             // Create a ConnectionFactory
> >>>             ActiveMQConnectionFactory* connectionFactory =
> >>>                 new ActiveMQConnectionFactory( "tcp://127.0.0.1:61617"
> >>> );
> >>>
> >>>             // Create a Connection
> >>>             connection = connectionFactory->createConnection();
> >>>             delete connectionFactory;
> >>>             connection->start();
> >>>
> >>>             connection->setExceptionListener(this);
> >>>
> >>>             // Create a Session
> >>>             session = connection->createSession(
> >>> Session::AUTO_ACKNOWLEDGE
> >>> );
> >>>
> >>>             // Create the destination (Topic or Queue)
> >>>             destination = session->createTopic( "TEST.FOO" );
> >>>
> >>>             // Create a MessageConsumer from the Session to the Topic
> or
> >>> Queue
> >>>             consumer = session->createConsumer( destination );
> >>>
> >>>             consumer->setMessageListener( this );
> >>>
> >>>             // Sleep while asynchronous messages come in.
> >>>             Thread::sleep( waitMillis );
> >>>
> >>>         } catch (CMSException& e) {
> >>>             e.printStackTrace();
> >>>         }
> >>>     }
> >>>
> >>>     virtual void onMessage( const Message* message ){
> >>>
> >>>         try
> >>>         {
> >>>                         // display the message received
> >>>             const TextMessage* textMessage = dynamic_cast< const
> >>> TextMessage*
> >>> >( message );
> >>>             string text = textMessage->getText();
> >>>             printf( "Consumer Received: %s\n", text.c_str() );
> >>>
> >>>                         // lets reply back with a thanks
> >>>                         if (textMessage->getCMSReplyTo().c_str() &&
> >>> strcmp("null",
> >>> textMessage->getCMSReplyTo().c_str())==1 ) {
> >>>                                 cms::Topic* destination_ =
> >>> session->createTopic(
> >>> textMessage->getCMSReplyTo().c_str() );
> >>>                                 MessageProducer* producer =
> >>> session->createProducer( destination_ );
> >>>                                 producer->setDeliveryMode(
> >>> DeliveryMode::NON_PERSISTANT );
> >>>                                 cout << endl << "Consumer Replying
> back
> >>> to
> >>> " << destination_->toString()
> >>> << endl;
> >>>                                 TextMessage* message_ =
> >>> session->createTextMessage( "Thank you for Hello
> >>> World !!!" );
> >>>                                 producer->send(message_);
> >>>                         }
> >>>
> >>>         } catch (CMSException& e) {
> >>>             e.printStackTrace();
> >>>         }
> >>>     }
> >>>
> >>>     virtual void onException( const CMSException& ex ) {
> >>>         printf("JMS Exception occured.  Shutting down client.\n");
> >>>     }
> >>>
> >>> private:
> >>>
> >>>     void cleanup(){
> >>>
> >>>                 // Destroy resources.
> >>>                 try{
> >>>                 if( destination != NULL ) delete destination;
> >>>                 }catch (CMSException& e) {}
> >>>                 destination = NULL;
> >>>
> >>>                 try{
> >>>             if( consumer != NULL ) delete consumer;
> >>>                 }catch (CMSException& e) {}
> >>>                 consumer = NULL;
> >>>
> >>>                 // Close open resources.
> >>>                 try{
> >>>                         if( session != NULL ) session->close();
> >>>                         if( connection != NULL ) connection->close();
> >>>                 }catch (CMSException& e) {}
> >>>
> >>>         try{
> >>>                 if( session != NULL ) delete session;
> >>>                 }catch (CMSException& e) {}
> >>>                 session = NULL;
> >>>
> >>>                 try{
> >>>                 if( connection != NULL ) delete connection;
> >>>                 }catch (CMSException& e) {}
> >>>                 connection = NULL;
> >>>     }
> >>> };
> >>>
> >>> int _tmain(int argc, _TCHAR* argv[])
> >>> {
> >>>     HelloWorldProducer producer( 5 );
> >>>         HelloWorldConsumer consumer( 1500 );
> >>>
> >>>         // Start the consumer thread.
> >>>         Thread consumerThread( &consumer );
> >>>         consumerThread.start();
> >>>
> >>>         // Start the producer thread.
> >>>         Thread producerThread( &producer );
> >>>         producerThread.start ();
> >>>
> >>>         // Wait for the threads to complete.
> >>>         producerThread.join();
> >>>         consumerThread.join();
> >>>
> >>>         cout << endl << endl << endl;
> >>>
> >>>         return 0;
> >>> }
> >>>
> >>>
> >>>
> >>>
> >>>
> >>>
> >>> Lalit Nagpal wrote:
> >>> >
> >>> > Hi
> >>> >
> >>> > I am using the activemq-cpp cms api 1.0 release. The problem I am
> >>> facing
> >>> > is like this-
> >>> > My producer sends a message to the consumer and a message should be
> >>> sent
> >>> > from the receiving end as a reply after this - consider a situation
> >>> where
> >>> > a loginRequest message has been sent and now a loginReply message
> >>> should
> >>> > be sent from the receiving end.
> >>> >
> >>> > Attached is a sample main that can reproduce the problem I am facing
> -
> >>> I
> >>> > have modified the sample helloproducer helloconsumer code available
> at
> >>> > http://activemq.org/site/activemq-cpp-client.html
> >>> > to reproduce my problem so that its easier for you to see.
> >>> >
> >>> >  http://www.nabble.com/file/6111/DestProbs.cpp DestProbs.cpp
> >>> >
> >>> > If you execute this piece of code you will see the output as in the
> >>> > attached image file
> >>> >
> >>> >  http://www.nabble.com/file/6110/DestinationProblem.JPG
> >>> >
> >>> > This is what the code does
> >>> > producer lets call it xxx sends a "Hello world! from thread xxxx" to
> >>> the
> >>> > consumer
> >>> > consumer lets call it yyy receives the message and displays it
> >>> > this is the normal behavior as given in the example on
> >>> > http://activemq.org/site/activemq-cpp-client.html
> >>> > Following extra needs to be done now
> >>> > from yyy a reply should go back to xxx ... for this i registered a
> >>> > producer at yyy by creating a topic using the
> message->getCMSReplyTo()
> >>> and
> >>> > then replying back to that destination.
> >>> >
> >>> > The mismatch can be easily see by doing a bstat .... when I created
> >>> the
> >>> > consumer yyy initially it created a topic by name say ABCDEFGH
> (which
> >>> is
> >>> a
> >>> > random id) and later on when I used the message->getCMSReplyTo()
to
> >>> create
> >>> > a topic the topic was registered with the name /topic/ABCDEFGH .....
>
> >>> the
> >>> > additional /topic/ that has got added is doing a mess up here and
> the
> >>> > replies from yyy to xxx are not reaching xxx (getting enqueued and
> not
> >>> > dequeued) ...
> >>> >
> >>> > the /topic/ gets added due to the following statement in
> >>> > HelloWorldProducer - run method
> >>> > message->setCMSReplyTo(replyTopic->toProviderString());
> >>> > here the toProviderString method adds it actually ... if you replace
> >>> this
> >>> > method with just the toString() method ... you will get a stomp
> >>> exception
> >>> > saying that destinations should start with either /topic/ or /queue/
> >>> >
> >>> > Can somebody make this code work please.
> >>> >
> >>> > For every message sent by the producer to consumer "Hello world!
> from
> >>> > thread xxxx" there should be a reply coming back as "Thank you for
> >>> Hello
> >>> > World !!!"
> >>> >
> >>> > Please help me urgently here.
> >>> >
> >>> > Thank you in advance
> >>> >
> >>> > Lalit Nagpal
> >>> > CSA, SunGard
> >>> >
> >>> >
> >>>
> >>> --
> >>> View this message in context:
> >>> http://www.nabble.com/activemq-cms-stomp---creating-a-topic-from-message-%3EgetCMSReplyTo%28%29-does-not-work-tf3142309.html#a8709155
>
> >>> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
> >>>
> >>>
> >>
> >>
> >
> >
>
> --
> View this message in context: http://www.nabble.com/activemq-cms-stomp---creating-a-topic-from-message-%3EgetCMSReplyTo%28%29-does-not-work-tf3142309.html#a8743627
>
> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message