activemq-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Lalit Nagpal <lalitte...@rediffmail.com>
Subject Re: activemq cms stomp - creating a topic from message->getCMSReplyTo() does not work
Date Tue, 30 Jan 2007 13:45:18 GMT

Copy pasting the entire sample main code to reproduce my issue

#include "stdafx.h"
#include <activemq/concurrent/Thread.h>
#include <activemq/concurrent/Runnable.h>
#include <activemq/core/ActiveMQConnectionFactory.h>
#include <cms/Connection.h>
#include <cms/Session.h>
#include <cms/TextMessage.h>
#include <cms/ExceptionListener.h>
#include <cms/MessageListener.h>
#include <stdlib.h>
#include <stdio.h>

using namespace activemq::core;
using namespace activemq::concurrent;
using namespace cms;
using namespace std;

bool GenerateGuid(char* buf, size_t strsz)
{
	GUID a_guid;

	UuidCreate(&a_guid);	

	if (strsz > 38)
	{
		sprintf(buf, 
				"{%08X-%04X-%04X-%02X%02X-%02X%02X%02X%02X%02X%02X}",
				a_guid.Data1, a_guid.Data2, a_guid.Data3,
				a_guid.Data4[0], a_guid.Data4[1],
				a_guid.Data4[2], a_guid.Data4[3], 
				a_guid.Data4[4], a_guid.Data4[5], 
				a_guid.Data4[6], a_guid.Data4[7]
				);
		return true;
	}
	return false;
};

class HelloWorldProducer : public Runnable, public MessageListener {
private:
	
	Connection* connection;
	Session* session;
	Destination* destination;
	MessageProducer* producer;
	MessageConsumer* consumer;
	int numMessages;

public:
	
	HelloWorldProducer( int numMessages ){
		connection = NULL;
    	session = NULL;
    	destination = NULL;
    	producer = NULL;
    	this->numMessages = numMessages;
	}
	
    virtual void onMessage( const Message* message ){
    	
        try
        {
    	    const TextMessage* textMessage = dynamic_cast< const TextMessage*
>( message );
            string text = textMessage->getText();
            printf( "Producer Received: %s\n", text.c_str() );
        } catch (CMSException& e) {
            e.printStackTrace();
        }
    }

	virtual ~HelloWorldProducer(){
		cleanup();
	}
	
    virtual void run() {
        try {
            // Create a ConnectionFactory
            ActiveMQConnectionFactory* connectionFactory = new
ActiveMQConnectionFactory("tcp://127.0.0.1:61617");

            // Create a Connection
            connection = connectionFactory->createConnection();
            connection->start();

            // Create a Session
            session = connection->createSession( Session::AUTO_ACKNOWLEDGE
);

            // Create the destination (Topic or Queue)
            destination = session->createTopic( "TEST.FOO" );

            // Create a MessageProducer from the Session to the Topic or
Queue
			cout << endl << "Producer has been registered at " <<
destination->toString() << endl << endl;
            producer = session->createProducer( destination );
            producer->setDeliveryMode( DeliveryMode::NON_PERSISTANT );

			// Lets have a reply-back channel to this producer now
			char* charGuid = new char[40];
			GenerateGuid(charGuid, 39);
			cms::Topic* replyTopic = session->createTopic(charGuid);
			consumer = session->createConsumer( replyTopic );
			consumer->setMessageListener(this);
			cout << "Reply back channel has been registered at " <<
replyTopic->toString() << endl << endl;

            // Stringify the thread id
            char threadIdStr[100];
            _snprintf(threadIdStr, sizeof(threadIdStr), "%d",
Thread::getId() );
            
            // Create a messages
            string text = (string)"Hello world! from thread " + threadIdStr;
            
            for( int ix=0; ix<numMessages; ++ix ){
	            TextMessage* message = session->createTextMessage( text );
				message->setCMSReplyTo(replyTopic->toProviderString());

    	        // Tell the producer to send the message
        	    printf( "Producer Sent message from thread %s\n", threadIdStr);
            	producer->send( message );
            	
            	delete message;
            }
			
        }catch ( CMSException& e ) {
            e.printStackTrace();
        }
    }
    
private:

    void cleanup(){
    				
			// Destroy resources.
			try{                        
            	if( destination != NULL ) delete destination;
			}catch ( CMSException& e ) {}
			destination = NULL;
			
			try{
	            if( producer != NULL ) delete producer;
			}catch ( CMSException& e ) {}
			producer = NULL;
			
    		// Close open resources.
    		try{
    			if( session != NULL ) session->close();
    			if( connection != NULL ) connection->close();
			}catch ( CMSException& e ) {}

			try{
            	if( session != NULL ) delete session;
			}catch ( CMSException& e ) {}
			session = NULL;
			
            try{
            	if( connection != NULL ) delete connection;
			}catch ( CMSException& e ) {}
    		connection = NULL;
    }
};

class HelloWorldConsumer : public ExceptionListener, 
                           public MessageListener,
                           public Runnable {
	
private:
	
	Connection* connection;
	Session* session;
	Destination* destination;
	MessageConsumer* consumer;
	long waitMillis;
		
public: 

	HelloWorldConsumer( long waitMillis ){
		connection = NULL;
    	session = NULL;
    	destination = NULL;
    	consumer = NULL;
    	this->waitMillis = waitMillis;
	}
    virtual ~HelloWorldConsumer(){    	
    	cleanup();
    }
    
    virtual void run() {
    	    	
        try {

            // Create a ConnectionFactory
            ActiveMQConnectionFactory* connectionFactory = 
                new ActiveMQConnectionFactory( "tcp://127.0.0.1:61617" );

            // Create a Connection
            connection = connectionFactory->createConnection();
            delete connectionFactory;
            connection->start();
            
            connection->setExceptionListener(this);

            // Create a Session
            session = connection->createSession( Session::AUTO_ACKNOWLEDGE
);

            // Create the destination (Topic or Queue)
            destination = session->createTopic( "TEST.FOO" );

            // Create a MessageConsumer from the Session to the Topic or
Queue
            consumer = session->createConsumer( destination );
            
            consumer->setMessageListener( this );
            
            // Sleep while asynchronous messages come in.
            Thread::sleep( waitMillis );		
            
        } catch (CMSException& e) {
            e.printStackTrace();
        }
    }
    
    virtual void onMessage( const Message* message ){
    	
        try
        {
			// display the message received
    	    const TextMessage* textMessage = dynamic_cast< const TextMessage*
>( message );
            string text = textMessage->getText();
            printf( "Consumer Received: %s\n", text.c_str() );

			// lets reply back with a thanks
			if (textMessage->getCMSReplyTo().c_str() && strcmp("null",
textMessage->getCMSReplyTo().c_str())==1 ) {
				cms::Topic* destination_ = session->createTopic(
textMessage->getCMSReplyTo().c_str() );
				MessageProducer* producer = session->createProducer( destination_ );
				producer->setDeliveryMode( DeliveryMode::NON_PERSISTANT );
				cout << endl << "Consumer Replying back to " << destination_->toString()
<< endl;
				TextMessage* message_ = session->createTextMessage( "Thank you for Hello
World !!!" );
				producer->send(message_);
			}

        } catch (CMSException& e) {
            e.printStackTrace();
        }
    }

    virtual void onException( const CMSException& ex ) {
        printf("JMS Exception occured.  Shutting down client.\n");
    }
    
private:

    void cleanup(){
    	
		// Destroy resources.
		try{                        
        	if( destination != NULL ) delete destination;
		}catch (CMSException& e) {}
		destination = NULL;
		
		try{
            if( consumer != NULL ) delete consumer;
		}catch (CMSException& e) {}
		consumer = NULL;
		
		// Close open resources.
		try{
			if( session != NULL ) session->close();
			if( connection != NULL ) connection->close();
		}catch (CMSException& e) {}
		
        try{
        	if( session != NULL ) delete session;
		}catch (CMSException& e) {}
		session = NULL;
		
		try{
        	if( connection != NULL ) delete connection;
		}catch (CMSException& e) {}
		connection = NULL;
    }
};

int _tmain(int argc, _TCHAR* argv[])
{
    HelloWorldProducer producer( 5 );
	HelloWorldConsumer consumer( 1500 );
	
	// Start the consumer thread.
	Thread consumerThread( &consumer );
	consumerThread.start();
	
	// Start the producer thread.
	Thread producerThread( &producer );
	producerThread.start();

	// Wait for the threads to complete.
	producerThread.join();
	consumerThread.join();
	
	cout << endl << endl << endl;

	return 0;
}






Lalit Nagpal wrote:
> 
> Hi
> 
> I am using the activemq-cpp cms api 1.0 release. The problem I am facing
> is like this- 
> My producer sends a message to the consumer and a message should be sent
> from the receiving end as a reply after this - consider a situation where
> a loginRequest message has been sent and now a loginReply message should
> be sent from the receiving end.
> 
> Attached is a sample main that can reproduce the problem I am facing - I
> have modified the sample helloproducer helloconsumer code available at 
> http://activemq.org/site/activemq-cpp-client.html
> to reproduce my problem so that its easier for you to see.
> 
>  http://www.nabble.com/file/6111/DestProbs.cpp DestProbs.cpp 
> 
> If you execute this piece of code you will see the output as in the
> attached image file
> 
>  http://www.nabble.com/file/6110/DestinationProblem.JPG 
> 
> This is what the code does
> producer lets call it xxx sends a "Hello world! from thread xxxx" to the
> consumer
> consumer lets call it yyy receives the message and displays it
> this is the normal behavior as given in the example on
> http://activemq.org/site/activemq-cpp-client.html
> Following extra needs to be done now
> from yyy a reply should go back to xxx ... for this i registered a
> producer at yyy by creating a topic using the message->getCMSReplyTo() and
> then replying back to that destination.
> 
> The mismatch can be easily see by doing a bstat .... when I created the
> consumer yyy initially it created a topic by name say ABCDEFGH (which is a
> random id) and later on when I used the message->getCMSReplyTo() to create
> a topic the topic was registered with the name /topic/ABCDEFGH ..... the
> additional /topic/ that has got added is doing a mess up here and the
> replies from yyy to xxx are not reaching xxx (getting enqueued and not
> dequeued) ... 
> 
> the /topic/ gets added due to the following statement in
> HelloWorldProducer - run method
> message->setCMSReplyTo(replyTopic->toProviderString()); 
> here the toProviderString method adds it actually ... if you replace this
> method with just the toString() method ... you will get a stomp exception
> saying that destinations should start with either /topic/ or /queue/
> 
> Can somebody make this code work please.
> 
> For every message sent by the producer to consumer "Hello world! from
> thread xxxx" there should be a reply coming back as "Thank you for Hello
> World !!!"
> 
> Please help me urgently here.
> 
> Thank you in advance
> 
> Lalit Nagpal
> CSA, SunGard
> 
> 

-- 
View this message in context: http://www.nabble.com/activemq-cms-stomp---creating-a-topic-from-message-%3EgetCMSReplyTo%28%29-does-not-work-tf3142309.html#a8709155
Sent from the ActiveMQ - User mailing list archive at Nabble.com.


Mime
View raw message