activemq-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sgliu <sheng...@sina.com>
Subject RE: above topic
Date Fri, 17 Nov 2006 01:48:58 GMT

Thanks for you advice.I wil read your tell me content.




tabish121 wrote:
> 
>> 
>> 
>> follow program,why cann't receive data?
>> How should I modify code?
> 
> Looks like you produce before the durable consumer has been connected
> once. 
> 
> See this link:
> http://www.activemq.org/site/how-do-durable-queues-and-topics-work.html
> 
> A durable consumer must have connected and then disconnected before
> messages are persisted in anticipation of the consumer reconnecting.
> That's why its important to use the same subscription name when
> reconnecting, so that the broker know that the consumer that was
> subscribed as durable is now back and it can deliver the messages that
> were stored in its absence.  
> 
>> 
>> #include <activemq/concurrent/Thread.h>
>> #include <activemq/concurrent/Runnable.h>
>> #include <activemq/core/ActiveMQConnectionFactory.h>
>> #include <activemq/util/Integer.h>
>> #include <cms/Connection.h>
>> #include <cms/Session.h>
>> #include <cms/TextMessage.h>
>> #include <cms/ExceptionListener.h>
>> #include <cms/MessageListener.h>
>> #include <stdlib.h>
>> 
>> using namespace activemq::core;
>> using namespace activemq::util;
>> using namespace activemq::concurrent;
>> using namespace cms;
>> using namespace std;
>> 
>> class HelloWorldProducer : public Runnable {
>> private:
>> 
>> 	Connection* connection;
>> 	Session* session;
>> 	Topic* destination;
>> 	MessageProducer* producer;
>> 	int numMessages;
>> 
>> public:
>> 
>> 	HelloWorldProducer( int numMessages ){
>> 		connection = NULL;
>>     	session = NULL;
>>     	destination = NULL;
>>     	producer = NULL;
>>     	this->numMessages = numMessages;
>> 	}
>> 
>> 	virtual ~HelloWorldProducer(){
>> 		cleanup();
>> 	}
>> 
>>     virtual void run() {
>>         try {
>> 			string user,passwd,sID;
>> 			user="default";
>> 			passwd="";
>> 			sID="lsgID";
>>             ActiveMQConnectionFactory* connectionFactory = new
>> ActiveMQConnectionFactory("tcp://localhost:61613",user,passwd,sID);
>> 
>>             connection =
>> connectionFactory->createConnection(user,passwd,sID);
>>             connection->start();
>> 
>> 			string sss=connection->getClientId();
>> 			cout << sss << endl;
>> 
>>             session = connection->createSession(
> Session::AUTO_ACKNOWLEDGE
>> );
>> 			destination = session->createTopic( "mytopic" );
>> 
>> 			producer = session->createProducer( destination
> );
>>             producer->setDeliveryMode( DeliveryMode::PERSISTANT );
>> 
>> 			producer->setTimeToLive(100000000);
>>             string threadIdStr = Integer::toString( Thread::getId() );
>> 
>>             // Create a messages
>>             string text = (string)"Hello world! from thread " +
>> threadIdStr;
>> 
>>             for( int ix=0; ix<numMessages; ++ix ){
>> 	            TextMessage* message = session->createTextMessage(
> text
>> );
>> 
>> 				string messageID="messageID";
>> 				message->setCMSExpiration(10000000000);
>> 				message->setCMSMessageId(messageID);
>> 
>>     	        // Tell the producer to send the message
>>         	    printf( "Sent message from thread %s\n",
>> threadIdStr.c_str() );
>>         		producer->send( message );
>> 
>>             	delete message;
>>             }
>> 
>>         }catch ( CMSException& e ) {
>>             e.printStackTrace();
>>         }
>>     }
>> 
>> private:
>> 
>>     void cleanup(){
>> 
>> 			// Destroy resources.
>> 			try{
>>             	if( destination != NULL ) delete destination;
>> 			}catch ( CMSException& e ) {}
>> 			destination = NULL;
>> 
>> 			try{
>> 	            if( producer != NULL ) delete producer;
>> 			}catch ( CMSException& e ) {}
>> 			producer = NULL;
>> 
>>     		// Close open resources.
>>     		try{
>>     			if( session != NULL ) session->close();
>>     			if( connection != NULL ) connection->close();
>> 			}catch ( CMSException& e ) {}
>> 
>> 			try{
>>             	if( session != NULL ) delete session;
>> 			}catch ( CMSException& e ) {}
>> 			session = NULL;
>> 
>>             try{
>>             	if( connection != NULL ) delete connection;
>> 			}catch ( CMSException& e ) {}
>>     		connection = NULL;
>>     }
>> };
>> 
>> class HelloWorldConsumer : public ExceptionListener,
>>                            public MessageListener,
>>                            public Runnable {
>> 
>> private:
>> 
>> 	Connection* connection;
>> 	Session* session;
>> 	Topic* destination;
>> 	MessageConsumer* consumer;
>> 	long waitMillis;
>> 
>> public:
>> 
>> 	HelloWorldConsumer( long waitMillis ){
>> 		connection = NULL;
>>     	session = NULL;
>>     	destination = NULL;
>>     	consumer = NULL;
>>     	this->waitMillis = waitMillis;
>> 	}
>>     virtual ~HelloWorldConsumer(){
>>     	cleanup();
>>     }
>> 
>>     virtual void run() {
>> 
>>         try {
>> 
>> 			string user,passwd,sID;
>> 			user="default";
>> 			passwd="";
>> 			sID="lsgID";
>>             // Create a ConnectionFactory
>>             ActiveMQConnectionFactory* connectionFactory =
>>                 new ActiveMQConnectionFactory(
>> "tcp://localhost:61613",user,passwd,sID);
>> 
>>             // Create a Connection
>>             connection =
>> connectionFactory->createConnection();//user,passwd,sID);
>>             delete connectionFactory;
>>             connection->start();
>> 
>>             connection->setExceptionListener(this);
>> 
>>             // Create a Session
>>             session = connection->createSession(
> Session::AUTO_ACKNOWLEDGE
>> );
>> 			destination = session->createTopic( "mytopic" );
>> 			consumer = session->createDurableConsumer(
> destination ,
>> user ,
>> "",false);
>> 
>>             consumer->setMessageListener( this );
>> 
>>             Thread::sleep( waitMillis );
>> 
>>         } catch (CMSException& e) {
>>             e.printStackTrace();
>>         }
>>     }
>> 
>>     virtual void onMessage( const Message* message ){
>> 
>>         try
>>         {
>>     	    const TextMessage* textMessage =
>>                 dynamic_cast< const TextMessage* >( message );
>>             string text = textMessage->getText();
>>             printf( "Received: %s\n", text.c_str() );
>>         } catch (CMSException& e) {
>>             e.printStackTrace();
>>         }
>>     }
>> 
>>     virtual void onException( const CMSException& ex ) {
>>         printf("JMS Exception occured.  Shutting down client.\n");
>>     }
>> 
>> private:
>> 
>>     void cleanup(){
>> 
>> 		// Destroy resources.
>> 		try{
>>         	if( destination != NULL ) delete destination;
>> 		}catch (CMSException& e) {}
>> 		destination = NULL;
>> 
>> 		try{
>>             if( consumer != NULL ) delete consumer;
>> 		}catch (CMSException& e) {}
>> 		consumer = NULL;
>> 
>> 		// Close open resources.
>> 		try{
>> 			if( session != NULL ) session->close();
>> 			if( connection != NULL ) connection->close();
>> 		}catch (CMSException& e) {}
>> 
>>         try{
>>         	if( session != NULL ) delete session;
>> 		}catch (CMSException& e) {}
>> 		session = NULL;
>> 
>> 		try{
>>         	if( connection != NULL ) delete connection;
>> 		}catch (CMSException& e) {}
>> 		connection = NULL;
>>     }
>> };
>>   void Produce()
>>  {
>>      HelloWorldProducer producer( 2 );
>>      Thread producerThread( &producer );
>>      producerThread.start();
>>      producerThread.join();
>>  }
>>  void Consumer()
>>  {
>>       HelloWorldConsumer consumer( 10000 );
>>       Thread consumerThread( &consumer );
>>       consumerThread.start();
>>       consumerThread.join();
>>  }
>>  int main(int argc, char* argv[])
>>  {
>>       Produce();
>>       Consumer();
>>  }
>> --
>> View this message in context: http://www.nabble.com/above-topic-
>> tf2641566.html#a7373622
>> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
> 
> 
> 

-- 
View this message in context: http://www.nabble.com/above-topic-tf2641566.html#a7392444
Sent from the ActiveMQ - User mailing list archive at Nabble.com.


Mime
View raw message