activemq-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sgliu <sheng...@sina.com>
Subject RE: above topic
Date Sat, 18 Nov 2006 00:57:32 GMT

I wish I producer  message A, and then I exit the producer program. Then I
start two consumer program(one is C1,the other is C2) at same time.C1  can
receive A , C2 can receive A.



tabish121 wrote:
> 
>> 
>> How do I achieve durable subscription base on my program?
>> 
> 
> What is it you are trying to do?  What problem are you trying to solve?
> 
> 
>> 
>> 
>> sgliu wrote:
>> >
>> > Thanks for you advice.I wil read your tell me content.
>> >
>> >
>> >
>> >
>> > tabish121 wrote:
>> >>
>> >>>
>> >>>
>> >>> follow program,why cann't receive data?
>> >>> How should I modify code?
>> >>
>> >> Looks like you produce before the durable consumer has been
> connected
>> >> once.
>> >>
>> >> See this link:
>> >>
> http://www.activemq.org/site/how-do-durable-queues-and-topics-work.html
>> >>
>> >> A durable consumer must have connected and then disconnected before
>> >> messages are persisted in anticipation of the consumer
> reconnecting.
>> >> That's why its important to use the same subscription name when
>> >> reconnecting, so that the broker know that the consumer that was
>> >> subscribed as durable is now back and it can deliver the messages
> that
>> >> were stored in its absence.
>> >>
>> >>>
>> >>> #include <activemq/concurrent/Thread.h>
>> >>> #include <activemq/concurrent/Runnable.h>
>> >>> #include <activemq/core/ActiveMQConnectionFactory.h>
>> >>> #include <activemq/util/Integer.h>
>> >>> #include <cms/Connection.h>
>> >>> #include <cms/Session.h>
>> >>> #include <cms/TextMessage.h>
>> >>> #include <cms/ExceptionListener.h>
>> >>> #include <cms/MessageListener.h>
>> >>> #include <stdlib.h>
>> >>>
>> >>> using namespace activemq::core;
>> >>> using namespace activemq::util;
>> >>> using namespace activemq::concurrent;
>> >>> using namespace cms;
>> >>> using namespace std;
>> >>>
>> >>> class HelloWorldProducer : public Runnable {
>> >>> private:
>> >>>
>> >>> 	Connection* connection;
>> >>> 	Session* session;
>> >>> 	Topic* destination;
>> >>> 	MessageProducer* producer;
>> >>> 	int numMessages;
>> >>>
>> >>> public:
>> >>>
>> >>> 	HelloWorldProducer( int numMessages ){
>> >>> 		connection = NULL;
>> >>>     	session = NULL;
>> >>>     	destination = NULL;
>> >>>     	producer = NULL;
>> >>>     	this->numMessages = numMessages;
>> >>> 	}
>> >>>
>> >>> 	virtual ~HelloWorldProducer(){
>> >>> 		cleanup();
>> >>> 	}
>> >>>
>> >>>     virtual void run() {
>> >>>         try {
>> >>> 			string user,passwd,sID;
>> >>> 			user="default";
>> >>> 			passwd="";
>> >>> 			sID="lsgID";
>> >>>             ActiveMQConnectionFactory* connectionFactory = new
>> >>>
> ActiveMQConnectionFactory("tcp://localhost:61613",user,passwd,sID);
>> >>>
>> >>>             connection =
>> >>> connectionFactory->createConnection(user,passwd,sID);
>> >>>             connection->start();
>> >>>
>> >>> 			string sss=connection->getClientId();
>> >>> 			cout << sss << endl;
>> >>>
>> >>>             session = connection->createSession(
>> >> Session::AUTO_ACKNOWLEDGE
>> >>> );
>> >>> 			destination = session->createTopic( "mytopic" );
>> >>>
>> >>> 			producer = session->createProducer( destination
>> >> );
>> >>>             producer->setDeliveryMode( DeliveryMode::PERSISTANT );
>> >>>
>> >>> 			producer->setTimeToLive(100000000);
>> >>>             string threadIdStr = Integer::toString(
> Thread::getId() );
>> >>>
>> >>>             // Create a messages
>> >>>             string text = (string)"Hello world! from thread " +
>> >>> threadIdStr;
>> >>>
>> >>>             for( int ix=0; ix<numMessages; ++ix ){
>> >>> 	            TextMessage* message = session->createTextMessage(
>> >> text
>> >>> );
>> >>>
>> >>> 				string messageID="messageID";
>> >>> 				message->setCMSExpiration(10000000000);
>> >>> 				message->setCMSMessageId(messageID);
>> >>>
>> >>>     	        // Tell the producer to send the message
>> >>>         	    printf( "Sent message from thread %s\n",
>> >>> threadIdStr.c_str() );
>> >>>         		producer->send( message );
>> >>>
>> >>>             	delete message;
>> >>>             }
>> >>>
>> >>>         }catch ( CMSException& e ) {
>> >>>             e.printStackTrace();
>> >>>         }
>> >>>     }
>> >>>
>> >>> private:
>> >>>
>> >>>     void cleanup(){
>> >>>
>> >>> 			// Destroy resources.
>> >>> 			try{
>> >>>             	if( destination != NULL ) delete destination;
>> >>> 			}catch ( CMSException& e ) {}
>> >>> 			destination = NULL;
>> >>>
>> >>> 			try{
>> >>> 	            if( producer != NULL ) delete producer;
>> >>> 			}catch ( CMSException& e ) {}
>> >>> 			producer = NULL;
>> >>>
>> >>>     		// Close open resources.
>> >>>     		try{
>> >>>     			if( session != NULL ) session->close();
>> >>>     			if( connection != NULL )
> connection->close();
>> >>> 			}catch ( CMSException& e ) {}
>> >>>
>> >>> 			try{
>> >>>             	if( session != NULL ) delete session;
>> >>> 			}catch ( CMSException& e ) {}
>> >>> 			session = NULL;
>> >>>
>> >>>             try{
>> >>>             	if( connection != NULL ) delete connection;
>> >>> 			}catch ( CMSException& e ) {}
>> >>>     		connection = NULL;
>> >>>     }
>> >>> };
>> >>>
>> >>> class HelloWorldConsumer : public ExceptionListener,
>> >>>                            public MessageListener,
>> >>>                            public Runnable {
>> >>>
>> >>> private:
>> >>>
>> >>> 	Connection* connection;
>> >>> 	Session* session;
>> >>> 	Topic* destination;
>> >>> 	MessageConsumer* consumer;
>> >>> 	long waitMillis;
>> >>>
>> >>> public:
>> >>>
>> >>> 	HelloWorldConsumer( long waitMillis ){
>> >>> 		connection = NULL;
>> >>>     	session = NULL;
>> >>>     	destination = NULL;
>> >>>     	consumer = NULL;
>> >>>     	this->waitMillis = waitMillis;
>> >>> 	}
>> >>>     virtual ~HelloWorldConsumer(){
>> >>>     	cleanup();
>> >>>     }
>> >>>
>> >>>     virtual void run() {
>> >>>
>> >>>         try {
>> >>>
>> >>> 			string user,passwd,sID;
>> >>> 			user="default";
>> >>> 			passwd="";
>> >>> 			sID="lsgID";
>> >>>             // Create a ConnectionFactory
>> >>>             ActiveMQConnectionFactory* connectionFactory =
>> >>>                 new ActiveMQConnectionFactory(
>> >>> "tcp://localhost:61613",user,passwd,sID);
>> >>>
>> >>>             // Create a Connection
>> >>>             connection =
>> >>> connectionFactory->createConnection();//user,passwd,sID);
>> >>>             delete connectionFactory;
>> >>>             connection->start();
>> >>>
>> >>>             connection->setExceptionListener(this);
>> >>>
>> >>>             // Create a Session
>> >>>             session = connection->createSession(
>> >> Session::AUTO_ACKNOWLEDGE
>> >>> );
>> >>> 			destination = session->createTopic( "mytopic" );
>> >>> 			consumer = session->createDurableConsumer(
>> >> destination ,
>> >>> user ,
>> >>> "",false);
>> >>>
>> >>>             consumer->setMessageListener( this );
>> >>>
>> >>>             Thread::sleep( waitMillis );
>> >>>
>> >>>         } catch (CMSException& e) {
>> >>>             e.printStackTrace();
>> >>>         }
>> >>>     }
>> >>>
>> >>>     virtual void onMessage( const Message* message ){
>> >>>
>> >>>         try
>> >>>         {
>> >>>     	    const TextMessage* textMessage =
>> >>>                 dynamic_cast< const TextMessage* >( message );
>> >>>             string text = textMessage->getText();
>> >>>             printf( "Received: %s\n", text.c_str() );
>> >>>         } catch (CMSException& e) {
>> >>>             e.printStackTrace();
>> >>>         }
>> >>>     }
>> >>>
>> >>>     virtual void onException( const CMSException& ex ) {
>> >>>         printf("JMS Exception occured.  Shutting down client.\n");
>> >>>     }
>> >>>
>> >>> private:
>> >>>
>> >>>     void cleanup(){
>> >>>
>> >>> 		// Destroy resources.
>> >>> 		try{
>> >>>         	if( destination != NULL ) delete destination;
>> >>> 		}catch (CMSException& e) {}
>> >>> 		destination = NULL;
>> >>>
>> >>> 		try{
>> >>>             if( consumer != NULL ) delete consumer;
>> >>> 		}catch (CMSException& e) {}
>> >>> 		consumer = NULL;
>> >>>
>> >>> 		// Close open resources.
>> >>> 		try{
>> >>> 			if( session != NULL ) session->close();
>> >>> 			if( connection != NULL ) connection->close();
>> >>> 		}catch (CMSException& e) {}
>> >>>
>> >>>         try{
>> >>>         	if( session != NULL ) delete session;
>> >>> 		}catch (CMSException& e) {}
>> >>> 		session = NULL;
>> >>>
>> >>> 		try{
>> >>>         	if( connection != NULL ) delete connection;
>> >>> 		}catch (CMSException& e) {}
>> >>> 		connection = NULL;
>> >>>     }
>> >>> };
>> >>>   void Produce()
>> >>>  {
>> >>>      HelloWorldProducer producer( 2 );
>> >>>      Thread producerThread( &producer );
>> >>>      producerThread.start();
>> >>>      producerThread.join();
>> >>>  }
>> >>>  void Consumer()
>> >>>  {
>> >>>       HelloWorldConsumer consumer( 10000 );
>> >>>       Thread consumerThread( &consumer );
>> >>>       consumerThread.start();
>> >>>       consumerThread.join();
>> >>>  }
>> >>>  int main(int argc, char* argv[])
>> >>>  {
>> >>>       Produce();
>> >>>       Consumer();
>> >>>  }
>> >>> --
>> >>> View this message in context: http://www.nabble.com/above-topic-
>> >>> tf2641566.html#a7373622
>> >>> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
>> >>
>> >>
>> >>
>> >
>> >
>> 
>> --
>> View this message in context: http://www.nabble.com/above-topic-
>> tf2641566.html#a7393074
>> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
> 
> 
> 

-- 
View this message in context: http://www.nabble.com/above-topic-tf2641566.html#a7413840
Sent from the ActiveMQ - User mailing list archive at Nabble.com.


Mime
View raw message