activemq-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sgliu <sheng...@sina.com>
Subject Re: SOS
Date Wed, 22 Nov 2006 05:39:31 GMT

Please help me.


sgliu wrote:
> 
> I wish I producer  message A, and then I exit the producer program. Then I
> start two consumer program(one is C1,the other is C2) at same time.C1  can
> receive A , C2 can receive A.
> 
> #include <activemq/concurrent/Thread.h> 
> #include <activemq/concurrent/Runnable.h> 
> #include <activemq/core/ActiveMQConnectionFactory.h> 
> #include <activemq/util/Integer.h> 
> #include <cms/Connection.h> 
> #include <cms/Session.h> 
> #include <cms/TextMessage.h> 
> #include <cms/ExceptionListener.h> 
> #include <cms/MessageListener.h> 
> #include <stdlib.h> 
> 
> using namespace activemq::core; 
> using namespace activemq::util; 
> using namespace activemq::concurrent; 
> using namespace cms; 
> using namespace std; 
> 
> class HelloWorldProducer : public Runnable { 
> private: 
>         
>         Connection* connection; 
>         Session* session; 
>         Topic* destination; 
>         MessageProducer* producer; 
>         int numMessages; 
> 
> public: 
>         
>         HelloWorldProducer( int numMessages ){ 
>                 connection = NULL; 
>     session = NULL; 
>     destination = NULL; 
>     producer = NULL; 
>     this->numMessages = numMessages; 
>         } 
>         
>         virtual ~HelloWorldProducer(){ 
>                 cleanup(); 
>         } 
>         
>     virtual void run() { 
>         try { 
>                         string user,passwd,sID; 
>                         user="default"; 
>                         passwd=""; 
>                         sID="lsgID"; 
>             ActiveMQConnectionFactory* connectionFactory = new
> ActiveMQConnectionFactory("tcp://localhost:61613",user,passwd,sID); 
> 
>             connection =
> connectionFactory->createConnection(user,passwd,sID); 
>             connection->start(); 
> 
>                         string sss=connection->getClientId(); 
>                         cout << sss << endl; 
> 
>             session = connection->createSession( Session::AUTO_ACKNOWLEDGE
> ); 
>                         destination = session->createTopic( "mytopic" ); 
> 
>                         producer = session->createProducer( destination ); 
>             producer->setDeliveryMode( DeliveryMode::PERSISTANT ); 
>             
>                         producer->setTimeToLive(100000000); 
>             string threadIdStr = Integer::toString( Thread::getId() ); 
>             
>             // Create a messages 
>             string text = (string)"Hello world! from thread " +
> threadIdStr; 
>             
>             for( int ix=0; ix<numMessages; ++ix ){ 
>                     TextMessage* message = session->createTextMessage(
> text ); 
> 
>                                 string messageID="messageID"; 
>                                 message->setCMSExpiration(10000000000); 
>                                 message->setCMSMessageId(messageID); 
> 
>            // Tell the producer to send the message 
>            printf( "Sent message from thread %s\n", threadIdStr.c_str() ); 
>         producer->send( message ); 
>             
>             delete message; 
>             } 
>                         
>         }catch ( CMSException& e ) { 
>             e.printStackTrace(); 
>         } 
>     } 
>     
> private: 
> 
>     void cleanup(){ 
>     
>                         // Destroy resources. 
>                         try{                         
>             if( destination != NULL ) delete destination; 
>                         }catch ( CMSException& e ) {} 
>                         destination = NULL; 
>                         
>                         try{ 
>                     if( producer != NULL ) delete producer; 
>                         }catch ( CMSException& e ) {} 
>                         producer = NULL; 
>                         
>     // Close open resources. 
>     try{ 
>     if( session != NULL ) session->close(); 
>     if( connection != NULL ) connection->close(); 
>                         }catch ( CMSException& e ) {} 
> 
>                         try{ 
>             if( session != NULL ) delete session; 
>                         }catch ( CMSException& e ) {} 
>                         session = NULL; 
>                         
>             try{ 
>             if( connection != NULL ) delete connection; 
>                         }catch ( CMSException& e ) {} 
>     connection = NULL; 
>     } 
> }; 
> 
> class HelloWorldConsumer : public ExceptionListener, 
>                            public MessageListener, 
>                            public Runnable { 
>         
> private: 
>         
>         Connection* connection; 
>         Session* session; 
>         Topic* destination; 
>         MessageConsumer* consumer; 
>         long waitMillis; 
>                 
> public: 
> 
>         HelloWorldConsumer( long waitMillis ){ 
>                 connection = NULL; 
>     session = NULL; 
>     destination = NULL; 
>     consumer = NULL; 
>     this->waitMillis = waitMillis; 
>         } 
>     virtual ~HelloWorldConsumer(){     
>     cleanup(); 
>     } 
>     
>     virtual void run() { 
>         
>         try { 
> 
>                         string user,passwd,sID; 
>                         user="default"; 
>                         passwd=""; 
>                         sID="lsgID"; 
>             // Create a ConnectionFactory 
>             ActiveMQConnectionFactory* connectionFactory = 
>                 new ActiveMQConnectionFactory(
> "tcp://localhost:61613",user,passwd,sID); 
> 
>             // Create a Connection 
>             connection =
> connectionFactory->createConnection();//user,passwd,sID); 
>             delete connectionFactory; 
>             connection->start(); 
>             
>             connection->setExceptionListener(this); 
> 
>             // Create a Session 
>             session = connection->createSession( Session::AUTO_ACKNOWLEDGE
> ); 
>                         destination = session->createTopic( "mytopic" ); 
>                         consumer = session->createDurableConsumer(
> destination , user , "",false); 
>             
>             consumer->setMessageListener( this ); 
>             
>             Thread::sleep( waitMillis ); 
>             
>         } catch (CMSException& e) { 
>             e.printStackTrace(); 
>         } 
>     } 
>     
>     virtual void onMessage( const Message* message ){ 
>     
>         try 
>         { 
>        const TextMessage* textMessage = 
>                 dynamic_cast< const TextMessage* >( message ); 
>             string text = textMessage->getText(); 
>             printf( "Received: %s\n", text.c_str() ); 
>         } catch (CMSException& e) { 
>             e.printStackTrace(); 
>         } 
>     } 
> 
>     virtual void onException( const CMSException& ex ) { 
>         printf("JMS Exception occured.  Shutting down client.\n"); 
>     } 
>     
> private: 
> 
>     void cleanup(){ 
>     
>                 // Destroy resources. 
>                 try{                         
>         if( destination != NULL ) delete destination; 
>                 }catch (CMSException& e) {} 
>                 destination = NULL; 
>                 
>                 try{ 
>             if( consumer != NULL ) delete consumer; 
>                 }catch (CMSException& e) {} 
>                 consumer = NULL; 
>                 
>                 // Close open resources. 
>                 try{ 
>                         if( session != NULL ) session->close(); 
>                         if( connection != NULL ) connection->close(); 
>                 }catch (CMSException& e) {} 
>                 
>         try{ 
>         if( session != NULL ) delete session; 
>                 }catch (CMSException& e) {} 
>                 session = NULL; 
>                 
>                 try{ 
>         if( connection != NULL ) delete connection; 
>                 }catch (CMSException& e) {} 
>                 connection = NULL; 
>     } 
> }; 
>   void Produce() 
>  { 
>      HelloWorldProducer producer( 2 ); 
>      Thread producerThread( &producer ); 
>      producerThread.start(); 
>      producerThread.join(); 
>  } 
>  void Consumer1() 
>  { 
>       HelloWorldConsumer consumer( 10000 ); 
>       Thread consumerThread( &consumer ); 
>       consumerThread.start(); 
>       consumerThread.join(); 
>  } 
>  void Consumer2() 
>  { 
>       HelloWorldConsumer consumer( 10000 ); 
>       Thread consumerThread( &consumer ); 
>       consumerThread.start(); 
>       consumerThread.join(); 
>  } 
>  int main(int argc, char* argv[]) 
>  { 
>       Produce(); 
>       Consumer1(); 
>       Consumer2(); 
>  }
> 

-- 
View this message in context: http://www.nabble.com/SOS-tf2666164.html#a7485492
Sent from the ActiveMQ - User mailing list archive at Nabble.com.


Mime
View raw message