activemq-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Nathan Mittler" <nathan.mitt...@gmail.com>
Subject Re: Message's live time
Date Fri, 01 Dec 2006 05:08:41 GMT
Just so you know I haven't forgotten about you :) ...

I have replicated the problem and I believe it is an issue with the broker.
I'm going to dig into it more later.


On 11/30/06, sgliu <shengguo@sina.com> wrote:
>
>
> Sorry,according to your indication,message doesn't disappear.
> I hope:
> send first,a few time later,I receive nothing. (a few time later,message
> will disappear itself.)
> follow source code:
> #include <activemq/concurrent/Thread.h>
> #include <activemq/concurrent/Runnable.h>
> #include <activemq/core/ActiveMQConnectionFactory.h>
> #include <activemq/util/Integer.h>
> #include <cms/Connection.h>
> #include <cms/Session.h>
> #include <cms/TextMessage.h>
> #include <cms/ExceptionListener.h>
> #include <cms/MessageListener.h>
> #include <stdlib.h>
>
> using namespace activemq::core;
> using namespace activemq::util;
> using namespace activemq::concurrent;
> using namespace cms;
> using namespace std;
>
> #include <sys/timeb.h>
> unsigned long getcurt()
> {
>         struct timeb t;
>         ftime (&t);
>         unsigned long timeStamp = (t.time * 1000LL) + t.millitm;
>         return timeStamp;
> }
>
> class HelloWorldProducer : public Runnable {
> private:
>
>         Connection* connection;
>         Session* session;
>         Topic* destination;
>         MessageProducer* producer;
>         int numMessages;
>
> public:
>
>         HelloWorldProducer( int numMessages ){
>                 connection = NULL;
>         session = NULL;
>         destination = NULL;
>         producer = NULL;
>         this->numMessages = numMessages;
>         }
>
>         virtual ~HelloWorldProducer(){
>                 cleanup();
>         }
>
>     virtual void run() {
>         try {
>                         string user,passwd,sID;
>                         user="default";
>                         passwd="";
>                         sID="lsgID";
>             // Create a ConnectionFactory
>             ActiveMQConnectionFactory* connectionFactory = new
> ActiveMQConnectionFactory("tcp://localhost:61613",user,passwd,sID);
>
>             // Create a Connection
>             connection =
> connectionFactory->createConnection(user,passwd,sID);
>             connection->start();
>
>                         string sss=connection->getClientId();
>                         cout << sss << endl;
>
>             session = connection->createSession( Session::AUTO_ACKNOWLEDGE
> );
>                         destination = session->createTopic( "mytopic" );
>
>                         producer = session->createProducer( destination );
>             producer->setDeliveryMode( DeliveryMode::PERSISTANT );
>
>                         unsigned long ttt=getcurt();
>                         producer->setTimeToLive( 10000);
>
>             // Create the Thread Id String
>             string threadIdStr = Integer::toString( Thread::getId() );
>
>             // Create a messages
>             string text = (string)"Hello world! from thread " +
> threadIdStr;
>
>             for( int ix=0; ix<numMessages; ++ix ){
>                     TextMessage* message = session->createTextMessage(
> text );
>
>         //                      message->setCMSTimeStamp(ttt);
>         //                      message->setCMSExpiration(ttt +
> 10000);         //消息到期时间
>         //                      string messageID="messageID";
>
>         //                      message->setCMSMessageId(messageID);            //消息ID
>                         //      producer->setTimeToLive(10000);
>
>
>                 // Tell the producer to send the message
>                     printf( "Sent message from thread %s\n",
> threadIdStr.c_str() );
>                         producer->send( message );
>
>                 delete message;
>             }
>
>         }catch ( CMSException& e ) {
>             e.printStackTrace();
>         }
>     }
>
> private:
>
>     void cleanup(){
>
>                         // Destroy resources.
>                         try{
>                 if( destination != NULL ) delete destination;
>                         }catch ( CMSException& e ) {}
>                         destination = NULL;
>
>                         try{
>                     if( producer != NULL ) delete producer;
>                         }catch ( CMSException& e ) {}
>                         producer = NULL;
>
>                 // Close open resources.
>                 try{
>                         if( session != NULL ) session->close();
>                         if( connection != NULL ) connection->close();
>                         }catch ( CMSException& e ) {}
>
>                         try{
>                 if( session != NULL ) delete session;
>                         }catch ( CMSException& e ) {}
>                         session = NULL;
>
>             try{
>                 if( connection != NULL ) delete connection;
>                         }catch ( CMSException& e ) {}
>                 connection = NULL;
>     }
> };
>
> class HelloWorldConsumer : public ExceptionListener,
>                            public MessageListener,
>                            public Runnable {
>
> private:
>
>         Connection* connection;
>         Session* session;
>         Topic* destination;
>         MessageConsumer* consumer;
>         long waitMillis;
>
> public:
>
>         HelloWorldConsumer( long waitMillis ){
>                 connection = NULL;
>         session = NULL;
>         destination = NULL;
>         consumer = NULL;
>         this->waitMillis = waitMillis;
>         }
>     virtual ~HelloWorldConsumer(){
>         cleanup();
>     }
>
>     virtual void run() {
>
>         try {
>
>                         string user,passwd,sID;
>                         user="default";
>                         passwd="";
>                         sID="lsgID";
>             // Create a ConnectionFactory
>             ActiveMQConnectionFactory* connectionFactory =
>                 new ActiveMQConnectionFactory(
> "tcp://localhost:61613",user,passwd,sID);
>
>             // Create a Connection
>             connection =
> connectionFactory->createConnection();//user,passwd,sID);
>             delete connectionFactory;
>             connection->start();
>
>             connection->setExceptionListener(this);
>
>             // Create a Session
>             session = connection->createSession( Session::AUTO_ACKNOWLEDGE
> );
>
>             // Create the destination (Topic or Queue)
>                         destination = session->createTopic(
> "mytopic?consumer.retroactive=true"
> );
>
>                         consumer = session->createDurableConsumer(
> destination , user ,
> "",false);
>
>             consumer->setMessageListener( this );
>
>             // Sleep while asynchronous messages come in.
>             Thread::sleep( waitMillis );
>
>         } catch (CMSException& e) {
>             e.printStackTrace();
>         }
>     }
>
>     virtual void onMessage( const Message* message ){
>
>         try
>         {
>             const TextMessage* textMessage =
>                 dynamic_cast< const TextMessage* >( message );
>             string text = textMessage->getText();
>             printf( "Received: %s\n", text.c_str() );
>         } catch (CMSException& e) {
>             e.printStackTrace();
>         }
>     }
>
>     virtual void onException( const CMSException& ex ) {
>         printf("JMS Exception occured.  Shutting down client.\n");
>     }
>
> private:
>
>     void cleanup(){
>
>                 // Destroy resources.
>                 try{
>                 if( destination != NULL ) delete destination;
>                 }catch (CMSException& e) {}
>                 destination = NULL;
>
>                 try{
>             if( consumer != NULL ) delete consumer;
>                 }catch (CMSException& e) {}
>                 consumer = NULL;
>
>                 // Close open resources.
>                 try{
>                         if( session != NULL ) session->close();
>                         if( connection != NULL ) connection->close();
>                 }catch (CMSException& e) {}
>
>         try{
>                 if( session != NULL ) delete session;
>                 }catch (CMSException& e) {}
>                 session = NULL;
>
>                 try{
>                 if( connection != NULL ) delete connection;
>                 }catch (CMSException& e) {}
>                 connection = NULL;
>     }
> };
>   void Produce()
> {
>      HelloWorldProducer producer( 2 );
>      Thread producerThread( &producer );
>      producerThread.start();
>      producerThread.join();
> }
> void Consumer()
> {
>       HelloWorldConsumer consumer( 5000 );
>       Thread consumerThread( &consumer );
>       consumerThread.start();
>       consumerThread.join();
> }
> int main(int argc, char* argv[])
> {
>    //   Produce();
>
>           cout << "Produce End." << endl;
>
>       Consumer();
> }
> ----------------------------------------
> I made follow two experiments:
> 1.  In HelloWorldProducer,add follow code,but 11 seconds later, I receive
> message yet.
> producer->setTimeToLive( 10000);
>
> 2. In HelloWorldProducer,add follow code,but 11 seconds later, I receive
> message yet.
> unsigned long ttt=getcurt();
> producer->setTimeToLive( ttt + 10000);
>
> Each time, at first,I have commented out the line:Consumer() in main
> function,and run program.
> Second,wait 20 seconds.
> Third,I have commented out the line:Produce() in main function,and run
> program.
> Both two experiment,I have received message.
>
> That message don't disappear itself.
> Why?
> Please help me.
> --
> View this message in context:
> http://www.nabble.com/Message%27s-live-time-tf2706004.html#a7617317
> Sent from the ActiveMQ - User mailing list archive at Nabble.com.
>
>
Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message