activemq-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Adrian Co <...@exist.com>
Subject Re: SOS
Date Wed, 22 Nov 2006 06:09:38 GMT
Try subscribing the consumers first before sending messages.

int main(int argc, char* argv[]) 
 { 
      Consumer1(); 
      Consumer2(); 

      Produce(); 

 }



sgliu wrote:
> Please help me.
>
>
> sgliu wrote:
>   
>> I wish I producer  message A, and then I exit the producer program. Then I
>> start two consumer program(one is C1,the other is C2) at same time.C1  can
>> receive A , C2 can receive A.
>>
>> #include <activemq/concurrent/Thread.h> 
>> #include <activemq/concurrent/Runnable.h> 
>> #include <activemq/core/ActiveMQConnectionFactory.h> 
>> #include <activemq/util/Integer.h> 
>> #include <cms/Connection.h> 
>> #include <cms/Session.h> 
>> #include <cms/TextMessage.h> 
>> #include <cms/ExceptionListener.h> 
>> #include <cms/MessageListener.h> 
>> #include <stdlib.h> 
>>
>> using namespace activemq::core; 
>> using namespace activemq::util; 
>> using namespace activemq::concurrent; 
>> using namespace cms; 
>> using namespace std; 
>>
>> class HelloWorldProducer : public Runnable { 
>> private: 
>>         
>>         Connection* connection; 
>>         Session* session; 
>>         Topic* destination; 
>>         MessageProducer* producer; 
>>         int numMessages; 
>>
>> public: 
>>         
>>         HelloWorldProducer( int numMessages ){ 
>>                 connection = NULL; 
>>     session = NULL; 
>>     destination = NULL; 
>>     producer = NULL; 
>>     this->numMessages = numMessages; 
>>         } 
>>         
>>         virtual ~HelloWorldProducer(){ 
>>                 cleanup(); 
>>         } 
>>         
>>     virtual void run() { 
>>         try { 
>>                         string user,passwd,sID; 
>>                         user="default"; 
>>                         passwd=""; 
>>                         sID="lsgID"; 
>>             ActiveMQConnectionFactory* connectionFactory = new
>> ActiveMQConnectionFactory("tcp://localhost:61613",user,passwd,sID); 
>>
>>             connection =
>> connectionFactory->createConnection(user,passwd,sID); 
>>             connection->start(); 
>>
>>                         string sss=connection->getClientId(); 
>>                         cout << sss << endl; 
>>
>>             session = connection->createSession( Session::AUTO_ACKNOWLEDGE
>> ); 
>>                         destination = session->createTopic( "mytopic" ); 
>>
>>                         producer = session->createProducer( destination ); 
>>             producer->setDeliveryMode( DeliveryMode::PERSISTANT ); 
>>             
>>                         producer->setTimeToLive(100000000); 
>>             string threadIdStr = Integer::toString( Thread::getId() ); 
>>             
>>             // Create a messages 
>>             string text = (string)"Hello world! from thread " +
>> threadIdStr; 
>>             
>>             for( int ix=0; ix<numMessages; ++ix ){ 
>>                     TextMessage* message = session->createTextMessage(
>> text ); 
>>
>>                                 string messageID="messageID"; 
>>                                 message->setCMSExpiration(10000000000); 
>>                                 message->setCMSMessageId(messageID); 
>>
>>            // Tell the producer to send the message 
>>            printf( "Sent message from thread %s\n", threadIdStr.c_str() ); 
>>         producer->send( message ); 
>>             
>>             delete message; 
>>             } 
>>                         
>>         }catch ( CMSException& e ) { 
>>             e.printStackTrace(); 
>>         } 
>>     } 
>>     
>> private: 
>>
>>     void cleanup(){ 
>>     
>>                         // Destroy resources. 
>>                         try{                         
>>             if( destination != NULL ) delete destination; 
>>                         }catch ( CMSException& e ) {} 
>>                         destination = NULL; 
>>                         
>>                         try{ 
>>                     if( producer != NULL ) delete producer; 
>>                         }catch ( CMSException& e ) {} 
>>                         producer = NULL; 
>>                         
>>     // Close open resources. 
>>     try{ 
>>     if( session != NULL ) session->close(); 
>>     if( connection != NULL ) connection->close(); 
>>                         }catch ( CMSException& e ) {} 
>>
>>                         try{ 
>>             if( session != NULL ) delete session; 
>>                         }catch ( CMSException& e ) {} 
>>                         session = NULL; 
>>                         
>>             try{ 
>>             if( connection != NULL ) delete connection; 
>>                         }catch ( CMSException& e ) {} 
>>     connection = NULL; 
>>     } 
>> }; 
>>
>> class HelloWorldConsumer : public ExceptionListener, 
>>                            public MessageListener, 
>>                            public Runnable { 
>>         
>> private: 
>>         
>>         Connection* connection; 
>>         Session* session; 
>>         Topic* destination; 
>>         MessageConsumer* consumer; 
>>         long waitMillis; 
>>                 
>> public: 
>>
>>         HelloWorldConsumer( long waitMillis ){ 
>>                 connection = NULL; 
>>     session = NULL; 
>>     destination = NULL; 
>>     consumer = NULL; 
>>     this->waitMillis = waitMillis; 
>>         } 
>>     virtual ~HelloWorldConsumer(){     
>>     cleanup(); 
>>     } 
>>     
>>     virtual void run() { 
>>         
>>         try { 
>>
>>                         string user,passwd,sID; 
>>                         user="default"; 
>>                         passwd=""; 
>>                         sID="lsgID"; 
>>             // Create a ConnectionFactory 
>>             ActiveMQConnectionFactory* connectionFactory = 
>>                 new ActiveMQConnectionFactory(
>> "tcp://localhost:61613",user,passwd,sID); 
>>
>>             // Create a Connection 
>>             connection =
>> connectionFactory->createConnection();//user,passwd,sID); 
>>             delete connectionFactory; 
>>             connection->start(); 
>>             
>>             connection->setExceptionListener(this); 
>>
>>             // Create a Session 
>>             session = connection->createSession( Session::AUTO_ACKNOWLEDGE
>> ); 
>>                         destination = session->createTopic( "mytopic" ); 
>>                         consumer = session->createDurableConsumer(
>> destination , user , "",false); 
>>             
>>             consumer->setMessageListener( this ); 
>>             
>>             Thread::sleep( waitMillis ); 
>>             
>>         } catch (CMSException& e) { 
>>             e.printStackTrace(); 
>>         } 
>>     } 
>>     
>>     virtual void onMessage( const Message* message ){ 
>>     
>>         try 
>>         { 
>>        const TextMessage* textMessage = 
>>                 dynamic_cast< const TextMessage* >( message ); 
>>             string text = textMessage->getText(); 
>>             printf( "Received: %s\n", text.c_str() ); 
>>         } catch (CMSException& e) { 
>>             e.printStackTrace(); 
>>         } 
>>     } 
>>
>>     virtual void onException( const CMSException& ex ) { 
>>         printf("JMS Exception occured.  Shutting down client.\n"); 
>>     } 
>>     
>> private: 
>>
>>     void cleanup(){ 
>>     
>>                 // Destroy resources. 
>>                 try{                         
>>         if( destination != NULL ) delete destination; 
>>                 }catch (CMSException& e) {} 
>>                 destination = NULL; 
>>                 
>>                 try{ 
>>             if( consumer != NULL ) delete consumer; 
>>                 }catch (CMSException& e) {} 
>>                 consumer = NULL; 
>>                 
>>                 // Close open resources. 
>>                 try{ 
>>                         if( session != NULL ) session->close(); 
>>                         if( connection != NULL ) connection->close(); 
>>                 }catch (CMSException& e) {} 
>>                 
>>         try{ 
>>         if( session != NULL ) delete session; 
>>                 }catch (CMSException& e) {} 
>>                 session = NULL; 
>>                 
>>                 try{ 
>>         if( connection != NULL ) delete connection; 
>>                 }catch (CMSException& e) {} 
>>                 connection = NULL; 
>>     } 
>> }; 
>>   void Produce() 
>>  { 
>>      HelloWorldProducer producer( 2 ); 
>>      Thread producerThread( &producer ); 
>>      producerThread.start(); 
>>      producerThread.join(); 
>>  } 
>>  void Consumer1() 
>>  { 
>>       HelloWorldConsumer consumer( 10000 ); 
>>       Thread consumerThread( &consumer ); 
>>       consumerThread.start(); 
>>       consumerThread.join(); 
>>  } 
>>  void Consumer2() 
>>  { 
>>       HelloWorldConsumer consumer( 10000 ); 
>>       Thread consumerThread( &consumer ); 
>>       consumerThread.start(); 
>>       consumerThread.join(); 
>>  } 
>>  int main(int argc, char* argv[]) 
>>  { 
>>       Produce(); 
>>       Consumer1(); 
>>       Consumer2(); 
>>  }
>>
>>     
>
>   


Mime
View raw message