activemq-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Bish, Tim" <Tim.B...@Sensis.com>
Subject RE: above topic
Date Fri, 17 Nov 2006 12:06:16 GMT
> 
> How do I achieve durable subscription base on my program?
> 

What is it you are trying to do?  What problem are you trying to solve?


> 
> 
> sgliu wrote:
> >
> > Thanks for you advice.I wil read your tell me content.
> >
> >
> >
> >
> > tabish121 wrote:
> >>
> >>>
> >>>
> >>> follow program,why cann't receive data?
> >>> How should I modify code?
> >>
> >> Looks like you produce before the durable consumer has been
connected
> >> once.
> >>
> >> See this link:
> >>
http://www.activemq.org/site/how-do-durable-queues-and-topics-work.html
> >>
> >> A durable consumer must have connected and then disconnected before
> >> messages are persisted in anticipation of the consumer
reconnecting.
> >> That's why its important to use the same subscription name when
> >> reconnecting, so that the broker know that the consumer that was
> >> subscribed as durable is now back and it can deliver the messages
that
> >> were stored in its absence.
> >>
> >>>
> >>> #include <activemq/concurrent/Thread.h>
> >>> #include <activemq/concurrent/Runnable.h>
> >>> #include <activemq/core/ActiveMQConnectionFactory.h>
> >>> #include <activemq/util/Integer.h>
> >>> #include <cms/Connection.h>
> >>> #include <cms/Session.h>
> >>> #include <cms/TextMessage.h>
> >>> #include <cms/ExceptionListener.h>
> >>> #include <cms/MessageListener.h>
> >>> #include <stdlib.h>
> >>>
> >>> using namespace activemq::core;
> >>> using namespace activemq::util;
> >>> using namespace activemq::concurrent;
> >>> using namespace cms;
> >>> using namespace std;
> >>>
> >>> class HelloWorldProducer : public Runnable {
> >>> private:
> >>>
> >>> 	Connection* connection;
> >>> 	Session* session;
> >>> 	Topic* destination;
> >>> 	MessageProducer* producer;
> >>> 	int numMessages;
> >>>
> >>> public:
> >>>
> >>> 	HelloWorldProducer( int numMessages ){
> >>> 		connection = NULL;
> >>>     	session = NULL;
> >>>     	destination = NULL;
> >>>     	producer = NULL;
> >>>     	this->numMessages = numMessages;
> >>> 	}
> >>>
> >>> 	virtual ~HelloWorldProducer(){
> >>> 		cleanup();
> >>> 	}
> >>>
> >>>     virtual void run() {
> >>>         try {
> >>> 			string user,passwd,sID;
> >>> 			user="default";
> >>> 			passwd="";
> >>> 			sID="lsgID";
> >>>             ActiveMQConnectionFactory* connectionFactory = new
> >>>
ActiveMQConnectionFactory("tcp://localhost:61613",user,passwd,sID);
> >>>
> >>>             connection =
> >>> connectionFactory->createConnection(user,passwd,sID);
> >>>             connection->start();
> >>>
> >>> 			string sss=connection->getClientId();
> >>> 			cout << sss << endl;
> >>>
> >>>             session = connection->createSession(
> >> Session::AUTO_ACKNOWLEDGE
> >>> );
> >>> 			destination = session->createTopic( "mytopic" );
> >>>
> >>> 			producer = session->createProducer( destination
> >> );
> >>>             producer->setDeliveryMode( DeliveryMode::PERSISTANT );
> >>>
> >>> 			producer->setTimeToLive(100000000);
> >>>             string threadIdStr = Integer::toString(
Thread::getId() );
> >>>
> >>>             // Create a messages
> >>>             string text = (string)"Hello world! from thread " +
> >>> threadIdStr;
> >>>
> >>>             for( int ix=0; ix<numMessages; ++ix ){
> >>> 	            TextMessage* message = session->createTextMessage(
> >> text
> >>> );
> >>>
> >>> 				string messageID="messageID";
> >>> 				message->setCMSExpiration(10000000000);
> >>> 				message->setCMSMessageId(messageID);
> >>>
> >>>     	        // Tell the producer to send the message
> >>>         	    printf( "Sent message from thread %s\n",
> >>> threadIdStr.c_str() );
> >>>         		producer->send( message );
> >>>
> >>>             	delete message;
> >>>             }
> >>>
> >>>         }catch ( CMSException& e ) {
> >>>             e.printStackTrace();
> >>>         }
> >>>     }
> >>>
> >>> private:
> >>>
> >>>     void cleanup(){
> >>>
> >>> 			// Destroy resources.
> >>> 			try{
> >>>             	if( destination != NULL ) delete destination;
> >>> 			}catch ( CMSException& e ) {}
> >>> 			destination = NULL;
> >>>
> >>> 			try{
> >>> 	            if( producer != NULL ) delete producer;
> >>> 			}catch ( CMSException& e ) {}
> >>> 			producer = NULL;
> >>>
> >>>     		// Close open resources.
> >>>     		try{
> >>>     			if( session != NULL ) session->close();
> >>>     			if( connection != NULL )
connection->close();
> >>> 			}catch ( CMSException& e ) {}
> >>>
> >>> 			try{
> >>>             	if( session != NULL ) delete session;
> >>> 			}catch ( CMSException& e ) {}
> >>> 			session = NULL;
> >>>
> >>>             try{
> >>>             	if( connection != NULL ) delete connection;
> >>> 			}catch ( CMSException& e ) {}
> >>>     		connection = NULL;
> >>>     }
> >>> };
> >>>
> >>> class HelloWorldConsumer : public ExceptionListener,
> >>>                            public MessageListener,
> >>>                            public Runnable {
> >>>
> >>> private:
> >>>
> >>> 	Connection* connection;
> >>> 	Session* session;
> >>> 	Topic* destination;
> >>> 	MessageConsumer* consumer;
> >>> 	long waitMillis;
> >>>
> >>> public:
> >>>
> >>> 	HelloWorldConsumer( long waitMillis ){
> >>> 		connection = NULL;
> >>>     	session = NULL;
> >>>     	destination = NULL;
> >>>     	consumer = NULL;
> >>>     	this->waitMillis = waitMillis;
> >>> 	}
> >>>     virtual ~HelloWorldConsumer(){
> >>>     	cleanup();
> >>>     }
> >>>
> >>>     virtual void run() {
> >>>
> >>>         try {
> >>>
> >>> 			string user,passwd,sID;
> >>> 			user="default";
> >>> 			passwd="";
> >>> 			sID="lsgID";
> >>>             // Create a ConnectionFactory
> >>>             ActiveMQConnectionFactory* connectionFactory =
> >>>                 new ActiveMQConnectionFactory(
> >>> "tcp://localhost:61613",user,passwd,sID);
> >>>
> >>>             // Create a Connection
> >>>             connection =
> >>> connectionFactory->createConnection();//user,passwd,sID);
> >>>             delete connectionFactory;
> >>>             connection->start();
> >>>
> >>>             connection->setExceptionListener(this);
> >>>
> >>>             // Create a Session
> >>>             session = connection->createSession(
> >> Session::AUTO_ACKNOWLEDGE
> >>> );
> >>> 			destination = session->createTopic( "mytopic" );
> >>> 			consumer = session->createDurableConsumer(
> >> destination ,
> >>> user ,
> >>> "",false);
> >>>
> >>>             consumer->setMessageListener( this );
> >>>
> >>>             Thread::sleep( waitMillis );
> >>>
> >>>         } catch (CMSException& e) {
> >>>             e.printStackTrace();
> >>>         }
> >>>     }
> >>>
> >>>     virtual void onMessage( const Message* message ){
> >>>
> >>>         try
> >>>         {
> >>>     	    const TextMessage* textMessage =
> >>>                 dynamic_cast< const TextMessage* >( message );
> >>>             string text = textMessage->getText();
> >>>             printf( "Received: %s\n", text.c_str() );
> >>>         } catch (CMSException& e) {
> >>>             e.printStackTrace();
> >>>         }
> >>>     }
> >>>
> >>>     virtual void onException( const CMSException& ex ) {
> >>>         printf("JMS Exception occured.  Shutting down client.\n");
> >>>     }
> >>>
> >>> private:
> >>>
> >>>     void cleanup(){
> >>>
> >>> 		// Destroy resources.
> >>> 		try{
> >>>         	if( destination != NULL ) delete destination;
> >>> 		}catch (CMSException& e) {}
> >>> 		destination = NULL;
> >>>
> >>> 		try{
> >>>             if( consumer != NULL ) delete consumer;
> >>> 		}catch (CMSException& e) {}
> >>> 		consumer = NULL;
> >>>
> >>> 		// Close open resources.
> >>> 		try{
> >>> 			if( session != NULL ) session->close();
> >>> 			if( connection != NULL ) connection->close();
> >>> 		}catch (CMSException& e) {}
> >>>
> >>>         try{
> >>>         	if( session != NULL ) delete session;
> >>> 		}catch (CMSException& e) {}
> >>> 		session = NULL;
> >>>
> >>> 		try{
> >>>         	if( connection != NULL ) delete connection;
> >>> 		}catch (CMSException& e) {}
> >>> 		connection = NULL;
> >>>     }
> >>> };
> >>>   void Produce()
> >>>  {
> >>>      HelloWorldProducer producer( 2 );
> >>>      Thread producerThread( &producer );
> >>>      producerThread.start();
> >>>      producerThread.join();
> >>>  }
> >>>  void Consumer()
> >>>  {
> >>>       HelloWorldConsumer consumer( 10000 );
> >>>       Thread consumerThread( &consumer );
> >>>       consumerThread.start();
> >>>       consumerThread.join();
> >>>  }
> >>>  int main(int argc, char* argv[])
> >>>  {
> >>>       Produce();
> >>>       Consumer();
> >>>  }
> >>> --
> >>> View this message in context: http://www.nabble.com/above-topic-
> >>> tf2641566.html#a7373622
> >>> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
> >>
> >>
> >>
> >
> >
> 
> --
> View this message in context: http://www.nabble.com/above-topic-
> tf2641566.html#a7393074
> Sent from the ActiveMQ - User mailing list archive at Nabble.com.


Mime
View raw message