activemq-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sgliu <sheng...@sina.com>
Subject Re: SOS
Date Sat, 25 Nov 2006 05:11:39 GMT

"retroactive consumers",
follow code,just I want.
queue = new ActiveMQQueue("TEST.QUEUE?consumer.retroactive=true");
consumer = session.createConsumer(queue);

Thanks a lot.



Adrian Co wrote:
> 
> I do not understand what you want. If you want to send first, then still 
> receive later using topics, you could try using retroactive consumers: 
> http://www.activemq.org/site/retroactive-consumer.html or create durable 
> subscribers for more reliable messaging,
> 
> sgliu wrote:
>> Send first,then receive.In topic.setCMSExpiration() function?
>> What can I do?
>>
>>
>>
>> Adrian Co wrote:
>>   
>>> Try subscribing the consumers first before sending messages.
>>>
>>> int main(int argc, char* argv[]) 
>>>  { 
>>>       Consumer1(); 
>>>       Consumer2(); 
>>>
>>>       Produce(); 
>>>
>>>  }
>>>
>>>
>>>
>>> sgliu wrote:
>>>     
>>>> Please help me.
>>>>
>>>>
>>>> sgliu wrote:
>>>>   
>>>>       
>>>>> I wish I producer  message A, and then I exit the producer program.
>>>>> Then
>>>>> I
>>>>> start two consumer program(one is C1,the other is C2) at same time.C1

>>>>> can
>>>>> receive A , C2 can receive A.
>>>>>
>>>>> #include <activemq/concurrent/Thread.h> 
>>>>> #include <activemq/concurrent/Runnable.h> 
>>>>> #include <activemq/core/ActiveMQConnectionFactory.h> 
>>>>> #include <activemq/util/Integer.h> 
>>>>> #include <cms/Connection.h> 
>>>>> #include <cms/Session.h> 
>>>>> #include <cms/TextMessage.h> 
>>>>> #include <cms/ExceptionListener.h> 
>>>>> #include <cms/MessageListener.h> 
>>>>> #include <stdlib.h> 
>>>>>
>>>>> using namespace activemq::core; 
>>>>> using namespace activemq::util; 
>>>>> using namespace activemq::concurrent; 
>>>>> using namespace cms; 
>>>>> using namespace std; 
>>>>>
>>>>> class HelloWorldProducer : public Runnable { 
>>>>> private: 
>>>>>         
>>>>>         Connection* connection; 
>>>>>         Session* session; 
>>>>>         Topic* destination; 
>>>>>         MessageProducer* producer; 
>>>>>         int numMessages; 
>>>>>
>>>>> public: 
>>>>>         
>>>>>         HelloWorldProducer( int numMessages ){ 
>>>>>                 connection = NULL; 
>>>>>     session = NULL; 
>>>>>     destination = NULL; 
>>>>>     producer = NULL; 
>>>>>     this->numMessages = numMessages; 
>>>>>         } 
>>>>>         
>>>>>         virtual ~HelloWorldProducer(){ 
>>>>>                 cleanup(); 
>>>>>         } 
>>>>>         
>>>>>     virtual void run() { 
>>>>>         try { 
>>>>>                         string user,passwd,sID; 
>>>>>                         user="default"; 
>>>>>                         passwd=""; 
>>>>>                         sID="lsgID"; 
>>>>>             ActiveMQConnectionFactory* connectionFactory = new
>>>>> ActiveMQConnectionFactory("tcp://localhost:61613",user,passwd,sID); 
>>>>>
>>>>>             connection =
>>>>> connectionFactory->createConnection(user,passwd,sID); 
>>>>>             connection->start(); 
>>>>>
>>>>>                         string sss=connection->getClientId(); 
>>>>>                         cout << sss << endl; 
>>>>>
>>>>>             session = connection->createSession(
>>>>> Session::AUTO_ACKNOWLEDGE
>>>>> ); 
>>>>>                         destination = session->createTopic( "mytopic"
>>>>> ); 
>>>>>
>>>>>                         producer = session->createProducer(
>>>>> destination
>>>>> ); 
>>>>>             producer->setDeliveryMode( DeliveryMode::PERSISTANT );

>>>>>             
>>>>>                         producer->setTimeToLive(100000000); 
>>>>>             string threadIdStr = Integer::toString( Thread::getId() );

>>>>>             
>>>>>             // Create a messages 
>>>>>             string text = (string)"Hello world! from thread " +
>>>>> threadIdStr; 
>>>>>             
>>>>>             for( int ix=0; ix<numMessages; ++ix ){ 
>>>>>                     TextMessage* message = session->createTextMessage(
>>>>> text ); 
>>>>>
>>>>>                                 string messageID="messageID"; 
>>>>>                                
>>>>> message->setCMSExpiration(10000000000); 
>>>>>                                 message->setCMSMessageId(messageID);

>>>>>
>>>>>            // Tell the producer to send the message 
>>>>>            printf( "Sent message from thread %s\n",
>>>>> threadIdStr.c_str()
>>>>> ); 
>>>>>         producer->send( message ); 
>>>>>             
>>>>>             delete message; 
>>>>>             } 
>>>>>                         
>>>>>         }catch ( CMSException& e ) { 
>>>>>             e.printStackTrace(); 
>>>>>         } 
>>>>>     } 
>>>>>     
>>>>> private: 
>>>>>
>>>>>     void cleanup(){ 
>>>>>     
>>>>>                         // Destroy resources. 
>>>>>                         try{                         
>>>>>             if( destination != NULL ) delete destination; 
>>>>>                         }catch ( CMSException& e ) {} 
>>>>>                         destination = NULL; 
>>>>>                         
>>>>>                         try{ 
>>>>>                     if( producer != NULL ) delete producer; 
>>>>>                         }catch ( CMSException& e ) {} 
>>>>>                         producer = NULL; 
>>>>>                         
>>>>>     // Close open resources. 
>>>>>     try{ 
>>>>>     if( session != NULL ) session->close(); 
>>>>>     if( connection != NULL ) connection->close(); 
>>>>>                         }catch ( CMSException& e ) {} 
>>>>>
>>>>>                         try{ 
>>>>>             if( session != NULL ) delete session; 
>>>>>                         }catch ( CMSException& e ) {} 
>>>>>                         session = NULL; 
>>>>>                         
>>>>>             try{ 
>>>>>             if( connection != NULL ) delete connection; 
>>>>>                         }catch ( CMSException& e ) {} 
>>>>>     connection = NULL; 
>>>>>     } 
>>>>> }; 
>>>>>
>>>>> class HelloWorldConsumer : public ExceptionListener, 
>>>>>                            public MessageListener, 
>>>>>                            public Runnable { 
>>>>>         
>>>>> private: 
>>>>>         
>>>>>         Connection* connection; 
>>>>>         Session* session; 
>>>>>         Topic* destination; 
>>>>>         MessageConsumer* consumer; 
>>>>>         long waitMillis; 
>>>>>                 
>>>>> public: 
>>>>>
>>>>>         HelloWorldConsumer( long waitMillis ){ 
>>>>>                 connection = NULL; 
>>>>>     session = NULL; 
>>>>>     destination = NULL; 
>>>>>     consumer = NULL; 
>>>>>     this->waitMillis = waitMillis; 
>>>>>         } 
>>>>>     virtual ~HelloWorldConsumer(){     
>>>>>     cleanup(); 
>>>>>     } 
>>>>>     
>>>>>     virtual void run() { 
>>>>>         
>>>>>         try { 
>>>>>
>>>>>                         string user,passwd,sID; 
>>>>>                         user="default"; 
>>>>>                         passwd=""; 
>>>>>                         sID="lsgID"; 
>>>>>             // Create a ConnectionFactory 
>>>>>             ActiveMQConnectionFactory* connectionFactory = 
>>>>>                 new ActiveMQConnectionFactory(
>>>>> "tcp://localhost:61613",user,passwd,sID); 
>>>>>
>>>>>             // Create a Connection 
>>>>>             connection =
>>>>> connectionFactory->createConnection();//user,passwd,sID); 
>>>>>             delete connectionFactory; 
>>>>>             connection->start(); 
>>>>>             
>>>>>             connection->setExceptionListener(this); 
>>>>>
>>>>>             // Create a Session 
>>>>>             session = connection->createSession(
>>>>> Session::AUTO_ACKNOWLEDGE
>>>>> ); 
>>>>>                         destination = session->createTopic( "mytopic"
>>>>> ); 
>>>>>                         consumer = session->createDurableConsumer(
>>>>> destination , user , "",false); 
>>>>>             
>>>>>             consumer->setMessageListener( this ); 
>>>>>             
>>>>>             Thread::sleep( waitMillis ); 
>>>>>             
>>>>>         } catch (CMSException& e) { 
>>>>>             e.printStackTrace(); 
>>>>>         } 
>>>>>     } 
>>>>>     
>>>>>     virtual void onMessage( const Message* message ){ 
>>>>>     
>>>>>         try 
>>>>>         { 
>>>>>        const TextMessage* textMessage = 
>>>>>                 dynamic_cast< const TextMessage* >( message );

>>>>>             string text = textMessage->getText(); 
>>>>>             printf( "Received: %s\n", text.c_str() ); 
>>>>>         } catch (CMSException& e) { 
>>>>>             e.printStackTrace(); 
>>>>>         } 
>>>>>     } 
>>>>>
>>>>>     virtual void onException( const CMSException& ex ) { 
>>>>>         printf("JMS Exception occured.  Shutting down client.\n"); 
>>>>>     } 
>>>>>     
>>>>> private: 
>>>>>
>>>>>     void cleanup(){ 
>>>>>     
>>>>>                 // Destroy resources. 
>>>>>                 try{                         
>>>>>         if( destination != NULL ) delete destination; 
>>>>>                 }catch (CMSException& e) {} 
>>>>>                 destination = NULL; 
>>>>>                 
>>>>>                 try{ 
>>>>>             if( consumer != NULL ) delete consumer; 
>>>>>                 }catch (CMSException& e) {} 
>>>>>                 consumer = NULL; 
>>>>>                 
>>>>>                 // Close open resources. 
>>>>>                 try{ 
>>>>>                         if( session != NULL ) session->close(); 
>>>>>                         if( connection != NULL ) connection->close();

>>>>>                 }catch (CMSException& e) {} 
>>>>>                 
>>>>>         try{ 
>>>>>         if( session != NULL ) delete session; 
>>>>>                 }catch (CMSException& e) {} 
>>>>>                 session = NULL; 
>>>>>                 
>>>>>                 try{ 
>>>>>         if( connection != NULL ) delete connection; 
>>>>>                 }catch (CMSException& e) {} 
>>>>>                 connection = NULL; 
>>>>>     } 
>>>>> }; 
>>>>>   void Produce() 
>>>>>  { 
>>>>>      HelloWorldProducer producer( 2 ); 
>>>>>      Thread producerThread( &producer ); 
>>>>>      producerThread.start(); 
>>>>>      producerThread.join(); 
>>>>>  } 
>>>>>  void Consumer1() 
>>>>>  { 
>>>>>       HelloWorldConsumer consumer( 10000 ); 
>>>>>       Thread consumerThread( &consumer ); 
>>>>>       consumerThread.start(); 
>>>>>       consumerThread.join(); 
>>>>>  } 
>>>>>  void Consumer2() 
>>>>>  { 
>>>>>       HelloWorldConsumer consumer( 10000 ); 
>>>>>       Thread consumerThread( &consumer ); 
>>>>>       consumerThread.start(); 
>>>>>       consumerThread.join(); 
>>>>>  } 
>>>>>  int main(int argc, char* argv[]) 
>>>>>  { 
>>>>>       Produce(); 
>>>>>       Consumer1(); 
>>>>>       Consumer2(); 
>>>>>  }
>>>>>
>>>>>     
>>>>>         
>>>>   
>>>>       
>>>
>>>     
>>
>>   
> 
> 
> 

-- 
View this message in context: http://www.nabble.com/SOS-tf2666164.html#a7535583
Sent from the ActiveMQ - User mailing list archive at Nabble.com.


Mime
View raw message