activemq-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sgliu <sheng...@sina.com>
Subject Re: SOS
Date Thu, 23 Nov 2006 05:38:47 GMT

Send first,then receive.In topic.setCMSExpiration() function?
What can I do?



Adrian Co wrote:
> 
> Try subscribing the consumers first before sending messages.
> 
> int main(int argc, char* argv[]) 
>  { 
>       Consumer1(); 
>       Consumer2(); 
> 
>       Produce(); 
> 
>  }
> 
> 
> 
> sgliu wrote:
>> Please help me.
>>
>>
>> sgliu wrote:
>>   
>>> I wish I producer  message A, and then I exit the producer program. Then
>>> I
>>> start two consumer program(one is C1,the other is C2) at same time.C1 
>>> can
>>> receive A , C2 can receive A.
>>>
>>> #include <activemq/concurrent/Thread.h> 
>>> #include <activemq/concurrent/Runnable.h> 
>>> #include <activemq/core/ActiveMQConnectionFactory.h> 
>>> #include <activemq/util/Integer.h> 
>>> #include <cms/Connection.h> 
>>> #include <cms/Session.h> 
>>> #include <cms/TextMessage.h> 
>>> #include <cms/ExceptionListener.h> 
>>> #include <cms/MessageListener.h> 
>>> #include <stdlib.h> 
>>>
>>> using namespace activemq::core; 
>>> using namespace activemq::util; 
>>> using namespace activemq::concurrent; 
>>> using namespace cms; 
>>> using namespace std; 
>>>
>>> class HelloWorldProducer : public Runnable { 
>>> private: 
>>>         
>>>         Connection* connection; 
>>>         Session* session; 
>>>         Topic* destination; 
>>>         MessageProducer* producer; 
>>>         int numMessages; 
>>>
>>> public: 
>>>         
>>>         HelloWorldProducer( int numMessages ){ 
>>>                 connection = NULL; 
>>>     session = NULL; 
>>>     destination = NULL; 
>>>     producer = NULL; 
>>>     this->numMessages = numMessages; 
>>>         } 
>>>         
>>>         virtual ~HelloWorldProducer(){ 
>>>                 cleanup(); 
>>>         } 
>>>         
>>>     virtual void run() { 
>>>         try { 
>>>                         string user,passwd,sID; 
>>>                         user="default"; 
>>>                         passwd=""; 
>>>                         sID="lsgID"; 
>>>             ActiveMQConnectionFactory* connectionFactory = new
>>> ActiveMQConnectionFactory("tcp://localhost:61613",user,passwd,sID); 
>>>
>>>             connection =
>>> connectionFactory->createConnection(user,passwd,sID); 
>>>             connection->start(); 
>>>
>>>                         string sss=connection->getClientId(); 
>>>                         cout << sss << endl; 
>>>
>>>             session = connection->createSession(
>>> Session::AUTO_ACKNOWLEDGE
>>> ); 
>>>                         destination = session->createTopic( "mytopic" ); 
>>>
>>>                         producer = session->createProducer( destination
>>> ); 
>>>             producer->setDeliveryMode( DeliveryMode::PERSISTANT ); 
>>>             
>>>                         producer->setTimeToLive(100000000); 
>>>             string threadIdStr = Integer::toString( Thread::getId() ); 
>>>             
>>>             // Create a messages 
>>>             string text = (string)"Hello world! from thread " +
>>> threadIdStr; 
>>>             
>>>             for( int ix=0; ix<numMessages; ++ix ){ 
>>>                     TextMessage* message = session->createTextMessage(
>>> text ); 
>>>
>>>                                 string messageID="messageID"; 
>>>                                 message->setCMSExpiration(10000000000); 
>>>                                 message->setCMSMessageId(messageID); 
>>>
>>>            // Tell the producer to send the message 
>>>            printf( "Sent message from thread %s\n", threadIdStr.c_str()
>>> ); 
>>>         producer->send( message ); 
>>>             
>>>             delete message; 
>>>             } 
>>>                         
>>>         }catch ( CMSException& e ) { 
>>>             e.printStackTrace(); 
>>>         } 
>>>     } 
>>>     
>>> private: 
>>>
>>>     void cleanup(){ 
>>>     
>>>                         // Destroy resources. 
>>>                         try{                         
>>>             if( destination != NULL ) delete destination; 
>>>                         }catch ( CMSException& e ) {} 
>>>                         destination = NULL; 
>>>                         
>>>                         try{ 
>>>                     if( producer != NULL ) delete producer; 
>>>                         }catch ( CMSException& e ) {} 
>>>                         producer = NULL; 
>>>                         
>>>     // Close open resources. 
>>>     try{ 
>>>     if( session != NULL ) session->close(); 
>>>     if( connection != NULL ) connection->close(); 
>>>                         }catch ( CMSException& e ) {} 
>>>
>>>                         try{ 
>>>             if( session != NULL ) delete session; 
>>>                         }catch ( CMSException& e ) {} 
>>>                         session = NULL; 
>>>                         
>>>             try{ 
>>>             if( connection != NULL ) delete connection; 
>>>                         }catch ( CMSException& e ) {} 
>>>     connection = NULL; 
>>>     } 
>>> }; 
>>>
>>> class HelloWorldConsumer : public ExceptionListener, 
>>>                            public MessageListener, 
>>>                            public Runnable { 
>>>         
>>> private: 
>>>         
>>>         Connection* connection; 
>>>         Session* session; 
>>>         Topic* destination; 
>>>         MessageConsumer* consumer; 
>>>         long waitMillis; 
>>>                 
>>> public: 
>>>
>>>         HelloWorldConsumer( long waitMillis ){ 
>>>                 connection = NULL; 
>>>     session = NULL; 
>>>     destination = NULL; 
>>>     consumer = NULL; 
>>>     this->waitMillis = waitMillis; 
>>>         } 
>>>     virtual ~HelloWorldConsumer(){     
>>>     cleanup(); 
>>>     } 
>>>     
>>>     virtual void run() { 
>>>         
>>>         try { 
>>>
>>>                         string user,passwd,sID; 
>>>                         user="default"; 
>>>                         passwd=""; 
>>>                         sID="lsgID"; 
>>>             // Create a ConnectionFactory 
>>>             ActiveMQConnectionFactory* connectionFactory = 
>>>                 new ActiveMQConnectionFactory(
>>> "tcp://localhost:61613",user,passwd,sID); 
>>>
>>>             // Create a Connection 
>>>             connection =
>>> connectionFactory->createConnection();//user,passwd,sID); 
>>>             delete connectionFactory; 
>>>             connection->start(); 
>>>             
>>>             connection->setExceptionListener(this); 
>>>
>>>             // Create a Session 
>>>             session = connection->createSession(
>>> Session::AUTO_ACKNOWLEDGE
>>> ); 
>>>                         destination = session->createTopic( "mytopic" ); 
>>>                         consumer = session->createDurableConsumer(
>>> destination , user , "",false); 
>>>             
>>>             consumer->setMessageListener( this ); 
>>>             
>>>             Thread::sleep( waitMillis ); 
>>>             
>>>         } catch (CMSException& e) { 
>>>             e.printStackTrace(); 
>>>         } 
>>>     } 
>>>     
>>>     virtual void onMessage( const Message* message ){ 
>>>     
>>>         try 
>>>         { 
>>>        const TextMessage* textMessage = 
>>>                 dynamic_cast< const TextMessage* >( message ); 
>>>             string text = textMessage->getText(); 
>>>             printf( "Received: %s\n", text.c_str() ); 
>>>         } catch (CMSException& e) { 
>>>             e.printStackTrace(); 
>>>         } 
>>>     } 
>>>
>>>     virtual void onException( const CMSException& ex ) { 
>>>         printf("JMS Exception occured.  Shutting down client.\n"); 
>>>     } 
>>>     
>>> private: 
>>>
>>>     void cleanup(){ 
>>>     
>>>                 // Destroy resources. 
>>>                 try{                         
>>>         if( destination != NULL ) delete destination; 
>>>                 }catch (CMSException& e) {} 
>>>                 destination = NULL; 
>>>                 
>>>                 try{ 
>>>             if( consumer != NULL ) delete consumer; 
>>>                 }catch (CMSException& e) {} 
>>>                 consumer = NULL; 
>>>                 
>>>                 // Close open resources. 
>>>                 try{ 
>>>                         if( session != NULL ) session->close(); 
>>>                         if( connection != NULL ) connection->close(); 
>>>                 }catch (CMSException& e) {} 
>>>                 
>>>         try{ 
>>>         if( session != NULL ) delete session; 
>>>                 }catch (CMSException& e) {} 
>>>                 session = NULL; 
>>>                 
>>>                 try{ 
>>>         if( connection != NULL ) delete connection; 
>>>                 }catch (CMSException& e) {} 
>>>                 connection = NULL; 
>>>     } 
>>> }; 
>>>   void Produce() 
>>>  { 
>>>      HelloWorldProducer producer( 2 ); 
>>>      Thread producerThread( &producer ); 
>>>      producerThread.start(); 
>>>      producerThread.join(); 
>>>  } 
>>>  void Consumer1() 
>>>  { 
>>>       HelloWorldConsumer consumer( 10000 ); 
>>>       Thread consumerThread( &consumer ); 
>>>       consumerThread.start(); 
>>>       consumerThread.join(); 
>>>  } 
>>>  void Consumer2() 
>>>  { 
>>>       HelloWorldConsumer consumer( 10000 ); 
>>>       Thread consumerThread( &consumer ); 
>>>       consumerThread.start(); 
>>>       consumerThread.join(); 
>>>  } 
>>>  int main(int argc, char* argv[]) 
>>>  { 
>>>       Produce(); 
>>>       Consumer1(); 
>>>       Consumer2(); 
>>>  }
>>>
>>>     
>>
>>   
> 
> 
> 

-- 
View this message in context: http://www.nabble.com/SOS-tf2666164.html#a7500819
Sent from the ActiveMQ - User mailing list archive at Nabble.com.


Mime
View raw message