activemq-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Adrian Co <...@exist.com>
Subject Re: SOS
Date Fri, 24 Nov 2006 10:12:16 GMT
I do not understand what you want. If you want to send first, then still 
receive later using topics, you could try using retroactive consumers: 
http://www.activemq.org/site/retroactive-consumer.html or create durable 
subscribers for more reliable messaging,

sgliu wrote:
> Send first,then receive.In topic.setCMSExpiration() function?
> What can I do?
>
>
>
> Adrian Co wrote:
>   
>> Try subscribing the consumers first before sending messages.
>>
>> int main(int argc, char* argv[]) 
>>  { 
>>       Consumer1(); 
>>       Consumer2(); 
>>
>>       Produce(); 
>>
>>  }
>>
>>
>>
>> sgliu wrote:
>>     
>>> Please help me.
>>>
>>>
>>> sgliu wrote:
>>>   
>>>       
>>>> I wish I producer  message A, and then I exit the producer program. Then
>>>> I
>>>> start two consumer program(one is C1,the other is C2) at same time.C1 
>>>> can
>>>> receive A , C2 can receive A.
>>>>
>>>> #include <activemq/concurrent/Thread.h> 
>>>> #include <activemq/concurrent/Runnable.h> 
>>>> #include <activemq/core/ActiveMQConnectionFactory.h> 
>>>> #include <activemq/util/Integer.h> 
>>>> #include <cms/Connection.h> 
>>>> #include <cms/Session.h> 
>>>> #include <cms/TextMessage.h> 
>>>> #include <cms/ExceptionListener.h> 
>>>> #include <cms/MessageListener.h> 
>>>> #include <stdlib.h> 
>>>>
>>>> using namespace activemq::core; 
>>>> using namespace activemq::util; 
>>>> using namespace activemq::concurrent; 
>>>> using namespace cms; 
>>>> using namespace std; 
>>>>
>>>> class HelloWorldProducer : public Runnable { 
>>>> private: 
>>>>         
>>>>         Connection* connection; 
>>>>         Session* session; 
>>>>         Topic* destination; 
>>>>         MessageProducer* producer; 
>>>>         int numMessages; 
>>>>
>>>> public: 
>>>>         
>>>>         HelloWorldProducer( int numMessages ){ 
>>>>                 connection = NULL; 
>>>>     session = NULL; 
>>>>     destination = NULL; 
>>>>     producer = NULL; 
>>>>     this->numMessages = numMessages; 
>>>>         } 
>>>>         
>>>>         virtual ~HelloWorldProducer(){ 
>>>>                 cleanup(); 
>>>>         } 
>>>>         
>>>>     virtual void run() { 
>>>>         try { 
>>>>                         string user,passwd,sID; 
>>>>                         user="default"; 
>>>>                         passwd=""; 
>>>>                         sID="lsgID"; 
>>>>             ActiveMQConnectionFactory* connectionFactory = new
>>>> ActiveMQConnectionFactory("tcp://localhost:61613",user,passwd,sID); 
>>>>
>>>>             connection =
>>>> connectionFactory->createConnection(user,passwd,sID); 
>>>>             connection->start(); 
>>>>
>>>>                         string sss=connection->getClientId(); 
>>>>                         cout << sss << endl; 
>>>>
>>>>             session = connection->createSession(
>>>> Session::AUTO_ACKNOWLEDGE
>>>> ); 
>>>>                         destination = session->createTopic( "mytopic"
); 
>>>>
>>>>                         producer = session->createProducer( destination
>>>> ); 
>>>>             producer->setDeliveryMode( DeliveryMode::PERSISTANT ); 
>>>>             
>>>>                         producer->setTimeToLive(100000000); 
>>>>             string threadIdStr = Integer::toString( Thread::getId() ); 
>>>>             
>>>>             // Create a messages 
>>>>             string text = (string)"Hello world! from thread " +
>>>> threadIdStr; 
>>>>             
>>>>             for( int ix=0; ix<numMessages; ++ix ){ 
>>>>                     TextMessage* message = session->createTextMessage(
>>>> text ); 
>>>>
>>>>                                 string messageID="messageID"; 
>>>>                                 message->setCMSExpiration(10000000000);

>>>>                                 message->setCMSMessageId(messageID); 
>>>>
>>>>            // Tell the producer to send the message 
>>>>            printf( "Sent message from thread %s\n", threadIdStr.c_str()
>>>> ); 
>>>>         producer->send( message ); 
>>>>             
>>>>             delete message; 
>>>>             } 
>>>>                         
>>>>         }catch ( CMSException& e ) { 
>>>>             e.printStackTrace(); 
>>>>         } 
>>>>     } 
>>>>     
>>>> private: 
>>>>
>>>>     void cleanup(){ 
>>>>     
>>>>                         // Destroy resources. 
>>>>                         try{                         
>>>>             if( destination != NULL ) delete destination; 
>>>>                         }catch ( CMSException& e ) {} 
>>>>                         destination = NULL; 
>>>>                         
>>>>                         try{ 
>>>>                     if( producer != NULL ) delete producer; 
>>>>                         }catch ( CMSException& e ) {} 
>>>>                         producer = NULL; 
>>>>                         
>>>>     // Close open resources. 
>>>>     try{ 
>>>>     if( session != NULL ) session->close(); 
>>>>     if( connection != NULL ) connection->close(); 
>>>>                         }catch ( CMSException& e ) {} 
>>>>
>>>>                         try{ 
>>>>             if( session != NULL ) delete session; 
>>>>                         }catch ( CMSException& e ) {} 
>>>>                         session = NULL; 
>>>>                         
>>>>             try{ 
>>>>             if( connection != NULL ) delete connection; 
>>>>                         }catch ( CMSException& e ) {} 
>>>>     connection = NULL; 
>>>>     } 
>>>> }; 
>>>>
>>>> class HelloWorldConsumer : public ExceptionListener, 
>>>>                            public MessageListener, 
>>>>                            public Runnable { 
>>>>         
>>>> private: 
>>>>         
>>>>         Connection* connection; 
>>>>         Session* session; 
>>>>         Topic* destination; 
>>>>         MessageConsumer* consumer; 
>>>>         long waitMillis; 
>>>>                 
>>>> public: 
>>>>
>>>>         HelloWorldConsumer( long waitMillis ){ 
>>>>                 connection = NULL; 
>>>>     session = NULL; 
>>>>     destination = NULL; 
>>>>     consumer = NULL; 
>>>>     this->waitMillis = waitMillis; 
>>>>         } 
>>>>     virtual ~HelloWorldConsumer(){     
>>>>     cleanup(); 
>>>>     } 
>>>>     
>>>>     virtual void run() { 
>>>>         
>>>>         try { 
>>>>
>>>>                         string user,passwd,sID; 
>>>>                         user="default"; 
>>>>                         passwd=""; 
>>>>                         sID="lsgID"; 
>>>>             // Create a ConnectionFactory 
>>>>             ActiveMQConnectionFactory* connectionFactory = 
>>>>                 new ActiveMQConnectionFactory(
>>>> "tcp://localhost:61613",user,passwd,sID); 
>>>>
>>>>             // Create a Connection 
>>>>             connection =
>>>> connectionFactory->createConnection();//user,passwd,sID); 
>>>>             delete connectionFactory; 
>>>>             connection->start(); 
>>>>             
>>>>             connection->setExceptionListener(this); 
>>>>
>>>>             // Create a Session 
>>>>             session = connection->createSession(
>>>> Session::AUTO_ACKNOWLEDGE
>>>> ); 
>>>>                         destination = session->createTopic( "mytopic"
); 
>>>>                         consumer = session->createDurableConsumer(
>>>> destination , user , "",false); 
>>>>             
>>>>             consumer->setMessageListener( this ); 
>>>>             
>>>>             Thread::sleep( waitMillis ); 
>>>>             
>>>>         } catch (CMSException& e) { 
>>>>             e.printStackTrace(); 
>>>>         } 
>>>>     } 
>>>>     
>>>>     virtual void onMessage( const Message* message ){ 
>>>>     
>>>>         try 
>>>>         { 
>>>>        const TextMessage* textMessage = 
>>>>                 dynamic_cast< const TextMessage* >( message ); 
>>>>             string text = textMessage->getText(); 
>>>>             printf( "Received: %s\n", text.c_str() ); 
>>>>         } catch (CMSException& e) { 
>>>>             e.printStackTrace(); 
>>>>         } 
>>>>     } 
>>>>
>>>>     virtual void onException( const CMSException& ex ) { 
>>>>         printf("JMS Exception occured.  Shutting down client.\n"); 
>>>>     } 
>>>>     
>>>> private: 
>>>>
>>>>     void cleanup(){ 
>>>>     
>>>>                 // Destroy resources. 
>>>>                 try{                         
>>>>         if( destination != NULL ) delete destination; 
>>>>                 }catch (CMSException& e) {} 
>>>>                 destination = NULL; 
>>>>                 
>>>>                 try{ 
>>>>             if( consumer != NULL ) delete consumer; 
>>>>                 }catch (CMSException& e) {} 
>>>>                 consumer = NULL; 
>>>>                 
>>>>                 // Close open resources. 
>>>>                 try{ 
>>>>                         if( session != NULL ) session->close(); 
>>>>                         if( connection != NULL ) connection->close();

>>>>                 }catch (CMSException& e) {} 
>>>>                 
>>>>         try{ 
>>>>         if( session != NULL ) delete session; 
>>>>                 }catch (CMSException& e) {} 
>>>>                 session = NULL; 
>>>>                 
>>>>                 try{ 
>>>>         if( connection != NULL ) delete connection; 
>>>>                 }catch (CMSException& e) {} 
>>>>                 connection = NULL; 
>>>>     } 
>>>> }; 
>>>>   void Produce() 
>>>>  { 
>>>>      HelloWorldProducer producer( 2 ); 
>>>>      Thread producerThread( &producer ); 
>>>>      producerThread.start(); 
>>>>      producerThread.join(); 
>>>>  } 
>>>>  void Consumer1() 
>>>>  { 
>>>>       HelloWorldConsumer consumer( 10000 ); 
>>>>       Thread consumerThread( &consumer ); 
>>>>       consumerThread.start(); 
>>>>       consumerThread.join(); 
>>>>  } 
>>>>  void Consumer2() 
>>>>  { 
>>>>       HelloWorldConsumer consumer( 10000 ); 
>>>>       Thread consumerThread( &consumer ); 
>>>>       consumerThread.start(); 
>>>>       consumerThread.join(); 
>>>>  } 
>>>>  int main(int argc, char* argv[]) 
>>>>  { 
>>>>       Produce(); 
>>>>       Consumer1(); 
>>>>       Consumer2(); 
>>>>  }
>>>>
>>>>     
>>>>         
>>>   
>>>       
>>
>>     
>
>   


Mime
View raw message