activemq-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sgliu <sheng...@sina.com>
Subject Re: Message's live time
Date Thu, 30 Nov 2006 11:00:37 GMT

Sorry,according to your indication,message doesn't disappear.
I hope: 
send first,a few time later,I receive nothing. (a few time later,message
will disappear itself.) 
follow source code:
#include <activemq/concurrent/Thread.h>
#include <activemq/concurrent/Runnable.h>
#include <activemq/core/ActiveMQConnectionFactory.h>
#include <activemq/util/Integer.h>
#include <cms/Connection.h>
#include <cms/Session.h>
#include <cms/TextMessage.h>
#include <cms/ExceptionListener.h>
#include <cms/MessageListener.h>
#include <stdlib.h>

using namespace activemq::core;
using namespace activemq::util;
using namespace activemq::concurrent;
using namespace cms;
using namespace std;

#include <sys/timeb.h>
unsigned long getcurt()
{
	struct timeb t;
	ftime (&t);
	unsigned long timeStamp = (t.time * 1000LL) + t.millitm;
	return timeStamp;
}

class HelloWorldProducer : public Runnable {
private:
	
	Connection* connection;
	Session* session;
	Topic* destination;
	MessageProducer* producer;
	int numMessages;

public:
	
	HelloWorldProducer( int numMessages ){
		connection = NULL;
    	session = NULL;
    	destination = NULL;
    	producer = NULL;
    	this->numMessages = numMessages;
	}
	
	virtual ~HelloWorldProducer(){
		cleanup();
	}
	
    virtual void run() {
        try {
			string user,passwd,sID;
			user="default";
			passwd="";
			sID="lsgID";
            // Create a ConnectionFactory
            ActiveMQConnectionFactory* connectionFactory = new
ActiveMQConnectionFactory("tcp://localhost:61613",user,passwd,sID);

            // Create a Connection
            connection =
connectionFactory->createConnection(user,passwd,sID);
            connection->start();

			string sss=connection->getClientId();
			cout << sss << endl;

            session = connection->createSession( Session::AUTO_ACKNOWLEDGE
);
			destination = session->createTopic( "mytopic" );

			producer = session->createProducer( destination );
            producer->setDeliveryMode( DeliveryMode::PERSISTANT );
            
			unsigned long ttt=getcurt();
			producer->setTimeToLive( 10000);

            // Create the Thread Id String
            string threadIdStr = Integer::toString( Thread::getId() );
            
            // Create a messages
            string text = (string)"Hello world! from thread " + threadIdStr;
            
            for( int ix=0; ix<numMessages; ++ix ){
	            TextMessage* message = session->createTextMessage( text );

	//			message->setCMSTimeStamp(ttt);
	//			message->setCMSExpiration(ttt + 10000);		//消息到期时间
	//			string messageID="messageID";
	//			message->setCMSMessageId(messageID);		//消息ID
			//	producer->setTimeToLive(10000);


    	        // Tell the producer to send the message
        	    printf( "Sent message from thread %s\n", threadIdStr.c_str() );
        		producer->send( message );
            	
            	delete message;
            }
			
        }catch ( CMSException& e ) {
            e.printStackTrace();
        }
    }
    
private:

    void cleanup(){
    				
			// Destroy resources.
			try{                        
            	if( destination != NULL ) delete destination;
			}catch ( CMSException& e ) {}
			destination = NULL;
			
			try{
	            if( producer != NULL ) delete producer;
			}catch ( CMSException& e ) {}
			producer = NULL;
			
    		// Close open resources.
    		try{
    			if( session != NULL ) session->close();
    			if( connection != NULL ) connection->close();
			}catch ( CMSException& e ) {}

			try{
            	if( session != NULL ) delete session;
			}catch ( CMSException& e ) {}
			session = NULL;
			
            try{
            	if( connection != NULL ) delete connection;
			}catch ( CMSException& e ) {}
    		connection = NULL;
    }
};

class HelloWorldConsumer : public ExceptionListener, 
                           public MessageListener,
                           public Runnable {
	
private:
	
	Connection* connection;
	Session* session;
	Topic* destination;
	MessageConsumer* consumer;
	long waitMillis;
		
public: 

	HelloWorldConsumer( long waitMillis ){
		connection = NULL;
    	session = NULL;
    	destination = NULL;
    	consumer = NULL;
    	this->waitMillis = waitMillis;
	}
    virtual ~HelloWorldConsumer(){    	
    	cleanup();
    }
    
    virtual void run() {
    	    	
        try {

			string user,passwd,sID;
			user="default";
			passwd="";
			sID="lsgID";
            // Create a ConnectionFactory
            ActiveMQConnectionFactory* connectionFactory = 
                new ActiveMQConnectionFactory(
"tcp://localhost:61613",user,passwd,sID);

            // Create a Connection
            connection =
connectionFactory->createConnection();//user,passwd,sID);
            delete connectionFactory;
            connection->start();
            
            connection->setExceptionListener(this);

            // Create a Session
            session = connection->createSession( Session::AUTO_ACKNOWLEDGE
);

            // Create the destination (Topic or Queue)
			destination = session->createTopic( "mytopic?consumer.retroactive=true"
);

			consumer = session->createDurableConsumer( destination , user ,
"",false);
            
            consumer->setMessageListener( this );
            
            // Sleep while asynchronous messages come in.
            Thread::sleep( waitMillis );		
            
        } catch (CMSException& e) {
            e.printStackTrace();
        }
    }
    
    virtual void onMessage( const Message* message ){
    	
        try
        {
    	    const TextMessage* textMessage = 
                dynamic_cast< const TextMessage* >( message );
            string text = textMessage->getText();
            printf( "Received: %s\n", text.c_str() );
        } catch (CMSException& e) {
            e.printStackTrace();
        }
    }

    virtual void onException( const CMSException& ex ) {
        printf("JMS Exception occured.  Shutting down client.\n");
    }
    
private:

    void cleanup(){
    	
		// Destroy resources.
		try{                        
        	if( destination != NULL ) delete destination;
		}catch (CMSException& e) {}
		destination = NULL;
		
		try{
            if( consumer != NULL ) delete consumer;
		}catch (CMSException& e) {}
		consumer = NULL;
		
		// Close open resources.
		try{
			if( session != NULL ) session->close();
			if( connection != NULL ) connection->close();
		}catch (CMSException& e) {}
		
        try{
        	if( session != NULL ) delete session;
		}catch (CMSException& e) {}
		session = NULL;
		
		try{
        	if( connection != NULL ) delete connection;
		}catch (CMSException& e) {}
		connection = NULL;
    }
};
  void Produce() 
 { 
     HelloWorldProducer producer( 2 ); 
     Thread producerThread( &producer ); 
     producerThread.start(); 
     producerThread.join(); 
 } 
 void Consumer() 
 { 
      HelloWorldConsumer consumer( 5000 ); 
      Thread consumerThread( &consumer ); 
      consumerThread.start(); 
      consumerThread.join(); 
 } 
 int main(int argc, char* argv[]) 
 { 
   //   Produce(); 

	  cout << "Produce End." << endl;

      Consumer(); 
 }
----------------------------------------
I made follow two experiments:
1.  In HelloWorldProducer,add follow code,but 11 seconds later, I receive
message yet.
producer->setTimeToLive( 10000);

2. In HelloWorldProducer,add follow code,but 11 seconds later, I receive
message yet.
unsigned long ttt=getcurt();
producer->setTimeToLive( ttt + 10000);

Each time, at first,I have commented out the line:Consumer() in main
function,and run program.
Second,wait 20 seconds.
Third,I have commented out the line:Produce() in main function,and run
program.
Both two experiment,I have received message.

That message don't disappear itself.
Why? 
Please help me.
-- 
View this message in context: http://www.nabble.com/Message%27s-live-time-tf2706004.html#a7617317
Sent from the ActiveMQ - User mailing list archive at Nabble.com.


Mime
View raw message