activemq-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Bish, Tim" <Tim.B...@Sensis.com>
Subject RE: above topic
Date Thu, 16 Nov 2006 12:12:34 GMT
> 
> 
> follow program,why cann't receive data?
> How should I modify code?

Looks like you produce before the durable consumer has been connected
once. 

See this link:
http://www.activemq.org/site/how-do-durable-queues-and-topics-work.html

A durable consumer must have connected and then disconnected before
messages are persisted in anticipation of the consumer reconnecting.
That's why its important to use the same subscription name when
reconnecting, so that the broker know that the consumer that was
subscribed as durable is now back and it can deliver the messages that
were stored in its absence.  

> 
> #include <activemq/concurrent/Thread.h>
> #include <activemq/concurrent/Runnable.h>
> #include <activemq/core/ActiveMQConnectionFactory.h>
> #include <activemq/util/Integer.h>
> #include <cms/Connection.h>
> #include <cms/Session.h>
> #include <cms/TextMessage.h>
> #include <cms/ExceptionListener.h>
> #include <cms/MessageListener.h>
> #include <stdlib.h>
> 
> using namespace activemq::core;
> using namespace activemq::util;
> using namespace activemq::concurrent;
> using namespace cms;
> using namespace std;
> 
> class HelloWorldProducer : public Runnable {
> private:
> 
> 	Connection* connection;
> 	Session* session;
> 	Topic* destination;
> 	MessageProducer* producer;
> 	int numMessages;
> 
> public:
> 
> 	HelloWorldProducer( int numMessages ){
> 		connection = NULL;
>     	session = NULL;
>     	destination = NULL;
>     	producer = NULL;
>     	this->numMessages = numMessages;
> 	}
> 
> 	virtual ~HelloWorldProducer(){
> 		cleanup();
> 	}
> 
>     virtual void run() {
>         try {
> 			string user,passwd,sID;
> 			user="default";
> 			passwd="";
> 			sID="lsgID";
>             ActiveMQConnectionFactory* connectionFactory = new
> ActiveMQConnectionFactory("tcp://localhost:61613",user,passwd,sID);
> 
>             connection =
> connectionFactory->createConnection(user,passwd,sID);
>             connection->start();
> 
> 			string sss=connection->getClientId();
> 			cout << sss << endl;
> 
>             session = connection->createSession(
Session::AUTO_ACKNOWLEDGE
> );
> 			destination = session->createTopic( "mytopic" );
> 
> 			producer = session->createProducer( destination
);
>             producer->setDeliveryMode( DeliveryMode::PERSISTANT );
> 
> 			producer->setTimeToLive(100000000);
>             string threadIdStr = Integer::toString( Thread::getId() );
> 
>             // Create a messages
>             string text = (string)"Hello world! from thread " +
> threadIdStr;
> 
>             for( int ix=0; ix<numMessages; ++ix ){
> 	            TextMessage* message = session->createTextMessage(
text
> );
> 
> 				string messageID="messageID";
> 				message->setCMSExpiration(10000000000);
> 				message->setCMSMessageId(messageID);
> 
>     	        // Tell the producer to send the message
>         	    printf( "Sent message from thread %s\n",
> threadIdStr.c_str() );
>         		producer->send( message );
> 
>             	delete message;
>             }
> 
>         }catch ( CMSException& e ) {
>             e.printStackTrace();
>         }
>     }
> 
> private:
> 
>     void cleanup(){
> 
> 			// Destroy resources.
> 			try{
>             	if( destination != NULL ) delete destination;
> 			}catch ( CMSException& e ) {}
> 			destination = NULL;
> 
> 			try{
> 	            if( producer != NULL ) delete producer;
> 			}catch ( CMSException& e ) {}
> 			producer = NULL;
> 
>     		// Close open resources.
>     		try{
>     			if( session != NULL ) session->close();
>     			if( connection != NULL ) connection->close();
> 			}catch ( CMSException& e ) {}
> 
> 			try{
>             	if( session != NULL ) delete session;
> 			}catch ( CMSException& e ) {}
> 			session = NULL;
> 
>             try{
>             	if( connection != NULL ) delete connection;
> 			}catch ( CMSException& e ) {}
>     		connection = NULL;
>     }
> };
> 
> class HelloWorldConsumer : public ExceptionListener,
>                            public MessageListener,
>                            public Runnable {
> 
> private:
> 
> 	Connection* connection;
> 	Session* session;
> 	Topic* destination;
> 	MessageConsumer* consumer;
> 	long waitMillis;
> 
> public:
> 
> 	HelloWorldConsumer( long waitMillis ){
> 		connection = NULL;
>     	session = NULL;
>     	destination = NULL;
>     	consumer = NULL;
>     	this->waitMillis = waitMillis;
> 	}
>     virtual ~HelloWorldConsumer(){
>     	cleanup();
>     }
> 
>     virtual void run() {
> 
>         try {
> 
> 			string user,passwd,sID;
> 			user="default";
> 			passwd="";
> 			sID="lsgID";
>             // Create a ConnectionFactory
>             ActiveMQConnectionFactory* connectionFactory =
>                 new ActiveMQConnectionFactory(
> "tcp://localhost:61613",user,passwd,sID);
> 
>             // Create a Connection
>             connection =
> connectionFactory->createConnection();//user,passwd,sID);
>             delete connectionFactory;
>             connection->start();
> 
>             connection->setExceptionListener(this);
> 
>             // Create a Session
>             session = connection->createSession(
Session::AUTO_ACKNOWLEDGE
> );
> 			destination = session->createTopic( "mytopic" );
> 			consumer = session->createDurableConsumer(
destination ,
> user ,
> "",false);
> 
>             consumer->setMessageListener( this );
> 
>             Thread::sleep( waitMillis );
> 
>         } catch (CMSException& e) {
>             e.printStackTrace();
>         }
>     }
> 
>     virtual void onMessage( const Message* message ){
> 
>         try
>         {
>     	    const TextMessage* textMessage =
>                 dynamic_cast< const TextMessage* >( message );
>             string text = textMessage->getText();
>             printf( "Received: %s\n", text.c_str() );
>         } catch (CMSException& e) {
>             e.printStackTrace();
>         }
>     }
> 
>     virtual void onException( const CMSException& ex ) {
>         printf("JMS Exception occured.  Shutting down client.\n");
>     }
> 
> private:
> 
>     void cleanup(){
> 
> 		// Destroy resources.
> 		try{
>         	if( destination != NULL ) delete destination;
> 		}catch (CMSException& e) {}
> 		destination = NULL;
> 
> 		try{
>             if( consumer != NULL ) delete consumer;
> 		}catch (CMSException& e) {}
> 		consumer = NULL;
> 
> 		// Close open resources.
> 		try{
> 			if( session != NULL ) session->close();
> 			if( connection != NULL ) connection->close();
> 		}catch (CMSException& e) {}
> 
>         try{
>         	if( session != NULL ) delete session;
> 		}catch (CMSException& e) {}
> 		session = NULL;
> 
> 		try{
>         	if( connection != NULL ) delete connection;
> 		}catch (CMSException& e) {}
> 		connection = NULL;
>     }
> };
>   void Produce()
>  {
>      HelloWorldProducer producer( 2 );
>      Thread producerThread( &producer );
>      producerThread.start();
>      producerThread.join();
>  }
>  void Consumer()
>  {
>       HelloWorldConsumer consumer( 10000 );
>       Thread consumerThread( &consumer );
>       consumerThread.start();
>       consumerThread.join();
>  }
>  int main(int argc, char* argv[])
>  {
>       Produce();
>       Consumer();
>  }
> --
> View this message in context: http://www.nabble.com/above-topic-
> tf2641566.html#a7373622
> Sent from the ActiveMQ - User mailing list archive at Nabble.com.


Mime
View raw message