activemq-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sgliu <sheng...@sina.com>
Subject RE: above topic
Date Fri, 17 Nov 2006 03:01:39 GMT

How do I achieve durable subscription base on my program?



sgliu wrote:
> 
> Thanks for you advice.I wil read your tell me content.
> 
> 
> 
> 
> tabish121 wrote:
>> 
>>> 
>>> 
>>> follow program,why cann't receive data?
>>> How should I modify code?
>> 
>> Looks like you produce before the durable consumer has been connected
>> once. 
>> 
>> See this link:
>> http://www.activemq.org/site/how-do-durable-queues-and-topics-work.html
>> 
>> A durable consumer must have connected and then disconnected before
>> messages are persisted in anticipation of the consumer reconnecting.
>> That's why its important to use the same subscription name when
>> reconnecting, so that the broker know that the consumer that was
>> subscribed as durable is now back and it can deliver the messages that
>> were stored in its absence.  
>> 
>>> 
>>> #include <activemq/concurrent/Thread.h>
>>> #include <activemq/concurrent/Runnable.h>
>>> #include <activemq/core/ActiveMQConnectionFactory.h>
>>> #include <activemq/util/Integer.h>
>>> #include <cms/Connection.h>
>>> #include <cms/Session.h>
>>> #include <cms/TextMessage.h>
>>> #include <cms/ExceptionListener.h>
>>> #include <cms/MessageListener.h>
>>> #include <stdlib.h>
>>> 
>>> using namespace activemq::core;
>>> using namespace activemq::util;
>>> using namespace activemq::concurrent;
>>> using namespace cms;
>>> using namespace std;
>>> 
>>> class HelloWorldProducer : public Runnable {
>>> private:
>>> 
>>> 	Connection* connection;
>>> 	Session* session;
>>> 	Topic* destination;
>>> 	MessageProducer* producer;
>>> 	int numMessages;
>>> 
>>> public:
>>> 
>>> 	HelloWorldProducer( int numMessages ){
>>> 		connection = NULL;
>>>     	session = NULL;
>>>     	destination = NULL;
>>>     	producer = NULL;
>>>     	this->numMessages = numMessages;
>>> 	}
>>> 
>>> 	virtual ~HelloWorldProducer(){
>>> 		cleanup();
>>> 	}
>>> 
>>>     virtual void run() {
>>>         try {
>>> 			string user,passwd,sID;
>>> 			user="default";
>>> 			passwd="";
>>> 			sID="lsgID";
>>>             ActiveMQConnectionFactory* connectionFactory = new
>>> ActiveMQConnectionFactory("tcp://localhost:61613",user,passwd,sID);
>>> 
>>>             connection =
>>> connectionFactory->createConnection(user,passwd,sID);
>>>             connection->start();
>>> 
>>> 			string sss=connection->getClientId();
>>> 			cout << sss << endl;
>>> 
>>>             session = connection->createSession(
>> Session::AUTO_ACKNOWLEDGE
>>> );
>>> 			destination = session->createTopic( "mytopic" );
>>> 
>>> 			producer = session->createProducer( destination
>> );
>>>             producer->setDeliveryMode( DeliveryMode::PERSISTANT );
>>> 
>>> 			producer->setTimeToLive(100000000);
>>>             string threadIdStr = Integer::toString( Thread::getId() );
>>> 
>>>             // Create a messages
>>>             string text = (string)"Hello world! from thread " +
>>> threadIdStr;
>>> 
>>>             for( int ix=0; ix<numMessages; ++ix ){
>>> 	            TextMessage* message = session->createTextMessage(
>> text
>>> );
>>> 
>>> 				string messageID="messageID";
>>> 				message->setCMSExpiration(10000000000);
>>> 				message->setCMSMessageId(messageID);
>>> 
>>>     	        // Tell the producer to send the message
>>>         	    printf( "Sent message from thread %s\n",
>>> threadIdStr.c_str() );
>>>         		producer->send( message );
>>> 
>>>             	delete message;
>>>             }
>>> 
>>>         }catch ( CMSException& e ) {
>>>             e.printStackTrace();
>>>         }
>>>     }
>>> 
>>> private:
>>> 
>>>     void cleanup(){
>>> 
>>> 			// Destroy resources.
>>> 			try{
>>>             	if( destination != NULL ) delete destination;
>>> 			}catch ( CMSException& e ) {}
>>> 			destination = NULL;
>>> 
>>> 			try{
>>> 	            if( producer != NULL ) delete producer;
>>> 			}catch ( CMSException& e ) {}
>>> 			producer = NULL;
>>> 
>>>     		// Close open resources.
>>>     		try{
>>>     			if( session != NULL ) session->close();
>>>     			if( connection != NULL ) connection->close();
>>> 			}catch ( CMSException& e ) {}
>>> 
>>> 			try{
>>>             	if( session != NULL ) delete session;
>>> 			}catch ( CMSException& e ) {}
>>> 			session = NULL;
>>> 
>>>             try{
>>>             	if( connection != NULL ) delete connection;
>>> 			}catch ( CMSException& e ) {}
>>>     		connection = NULL;
>>>     }
>>> };
>>> 
>>> class HelloWorldConsumer : public ExceptionListener,
>>>                            public MessageListener,
>>>                            public Runnable {
>>> 
>>> private:
>>> 
>>> 	Connection* connection;
>>> 	Session* session;
>>> 	Topic* destination;
>>> 	MessageConsumer* consumer;
>>> 	long waitMillis;
>>> 
>>> public:
>>> 
>>> 	HelloWorldConsumer( long waitMillis ){
>>> 		connection = NULL;
>>>     	session = NULL;
>>>     	destination = NULL;
>>>     	consumer = NULL;
>>>     	this->waitMillis = waitMillis;
>>> 	}
>>>     virtual ~HelloWorldConsumer(){
>>>     	cleanup();
>>>     }
>>> 
>>>     virtual void run() {
>>> 
>>>         try {
>>> 
>>> 			string user,passwd,sID;
>>> 			user="default";
>>> 			passwd="";
>>> 			sID="lsgID";
>>>             // Create a ConnectionFactory
>>>             ActiveMQConnectionFactory* connectionFactory =
>>>                 new ActiveMQConnectionFactory(
>>> "tcp://localhost:61613",user,passwd,sID);
>>> 
>>>             // Create a Connection
>>>             connection =
>>> connectionFactory->createConnection();//user,passwd,sID);
>>>             delete connectionFactory;
>>>             connection->start();
>>> 
>>>             connection->setExceptionListener(this);
>>> 
>>>             // Create a Session
>>>             session = connection->createSession(
>> Session::AUTO_ACKNOWLEDGE
>>> );
>>> 			destination = session->createTopic( "mytopic" );
>>> 			consumer = session->createDurableConsumer(
>> destination ,
>>> user ,
>>> "",false);
>>> 
>>>             consumer->setMessageListener( this );
>>> 
>>>             Thread::sleep( waitMillis );
>>> 
>>>         } catch (CMSException& e) {
>>>             e.printStackTrace();
>>>         }
>>>     }
>>> 
>>>     virtual void onMessage( const Message* message ){
>>> 
>>>         try
>>>         {
>>>     	    const TextMessage* textMessage =
>>>                 dynamic_cast< const TextMessage* >( message );
>>>             string text = textMessage->getText();
>>>             printf( "Received: %s\n", text.c_str() );
>>>         } catch (CMSException& e) {
>>>             e.printStackTrace();
>>>         }
>>>     }
>>> 
>>>     virtual void onException( const CMSException& ex ) {
>>>         printf("JMS Exception occured.  Shutting down client.\n");
>>>     }
>>> 
>>> private:
>>> 
>>>     void cleanup(){
>>> 
>>> 		// Destroy resources.
>>> 		try{
>>>         	if( destination != NULL ) delete destination;
>>> 		}catch (CMSException& e) {}
>>> 		destination = NULL;
>>> 
>>> 		try{
>>>             if( consumer != NULL ) delete consumer;
>>> 		}catch (CMSException& e) {}
>>> 		consumer = NULL;
>>> 
>>> 		// Close open resources.
>>> 		try{
>>> 			if( session != NULL ) session->close();
>>> 			if( connection != NULL ) connection->close();
>>> 		}catch (CMSException& e) {}
>>> 
>>>         try{
>>>         	if( session != NULL ) delete session;
>>> 		}catch (CMSException& e) {}
>>> 		session = NULL;
>>> 
>>> 		try{
>>>         	if( connection != NULL ) delete connection;
>>> 		}catch (CMSException& e) {}
>>> 		connection = NULL;
>>>     }
>>> };
>>>   void Produce()
>>>  {
>>>      HelloWorldProducer producer( 2 );
>>>      Thread producerThread( &producer );
>>>      producerThread.start();
>>>      producerThread.join();
>>>  }
>>>  void Consumer()
>>>  {
>>>       HelloWorldConsumer consumer( 10000 );
>>>       Thread consumerThread( &consumer );
>>>       consumerThread.start();
>>>       consumerThread.join();
>>>  }
>>>  int main(int argc, char* argv[])
>>>  {
>>>       Produce();
>>>       Consumer();
>>>  }
>>> --
>>> View this message in context: http://www.nabble.com/above-topic-
>>> tf2641566.html#a7373622
>>> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
>> 
>> 
>> 
> 
> 

-- 
View this message in context: http://www.nabble.com/above-topic-tf2641566.html#a7393074
Sent from the ActiveMQ - User mailing list archive at Nabble.com.


Mime
View raw message