activemq-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From sgliu <sheng...@sina.com>
Subject SOS
Date Mon, 20 Nov 2006 00:52:56 GMT

I wish I producer  message A, and then I exit the producer program. Then I
start two consumer program(one is C1,the other is C2) at same time.C1  can
receive A , C2 can receive A.

#include <activemq/concurrent/Thread.h> 
#include <activemq/concurrent/Runnable.h> 
#include <activemq/core/ActiveMQConnectionFactory.h> 
#include <activemq/util/Integer.h> 
#include <cms/Connection.h> 
#include <cms/Session.h> 
#include <cms/TextMessage.h> 
#include <cms/ExceptionListener.h> 
#include <cms/MessageListener.h> 
#include <stdlib.h> 

using namespace activemq::core; 
using namespace activemq::util; 
using namespace activemq::concurrent; 
using namespace cms; 
using namespace std; 

class HelloWorldProducer : public Runnable { 
private: 
        
        Connection* connection; 
        Session* session; 
        Topic* destination; 
        MessageProducer* producer; 
        int numMessages; 

public: 
        
        HelloWorldProducer( int numMessages ){ 
                connection = NULL; 
    session = NULL; 
    destination = NULL; 
    producer = NULL; 
    this->numMessages = numMessages; 
        } 
        
        virtual ~HelloWorldProducer(){ 
                cleanup(); 
        } 
        
    virtual void run() { 
        try { 
                        string user,passwd,sID; 
                        user="default"; 
                        passwd=""; 
                        sID="lsgID"; 
            ActiveMQConnectionFactory* connectionFactory = new
ActiveMQConnectionFactory("tcp://localhost:61613",user,passwd,sID); 

            connection =
connectionFactory->createConnection(user,passwd,sID); 
            connection->start(); 

                        string sss=connection->getClientId(); 
                        cout << sss << endl; 

            session = connection->createSession( Session::AUTO_ACKNOWLEDGE
); 
                        destination = session->createTopic( "mytopic" ); 

                        producer = session->createProducer( destination ); 
            producer->setDeliveryMode( DeliveryMode::PERSISTANT ); 
            
                        producer->setTimeToLive(100000000); 
            string threadIdStr = Integer::toString( Thread::getId() ); 
            
            // Create a messages 
            string text = (string)"Hello world! from thread " + threadIdStr; 
            
            for( int ix=0; ix<numMessages; ++ix ){ 
                    TextMessage* message = session->createTextMessage( text
); 

                                string messageID="messageID"; 
                                message->setCMSExpiration(10000000000); 
                                message->setCMSMessageId(messageID); 

           // Tell the producer to send the message 
           printf( "Sent message from thread %s\n", threadIdStr.c_str() ); 
        producer->send( message ); 
            
            delete message; 
            } 
                        
        }catch ( CMSException& e ) { 
            e.printStackTrace(); 
        } 
    } 
    
private: 

    void cleanup(){ 
    
                        // Destroy resources. 
                        try{                         
            if( destination != NULL ) delete destination; 
                        }catch ( CMSException& e ) {} 
                        destination = NULL; 
                        
                        try{ 
                    if( producer != NULL ) delete producer; 
                        }catch ( CMSException& e ) {} 
                        producer = NULL; 
                        
    // Close open resources. 
    try{ 
    if( session != NULL ) session->close(); 
    if( connection != NULL ) connection->close(); 
                        }catch ( CMSException& e ) {} 

                        try{ 
            if( session != NULL ) delete session; 
                        }catch ( CMSException& e ) {} 
                        session = NULL; 
                        
            try{ 
            if( connection != NULL ) delete connection; 
                        }catch ( CMSException& e ) {} 
    connection = NULL; 
    } 
}; 

class HelloWorldConsumer : public ExceptionListener, 
                           public MessageListener, 
                           public Runnable { 
        
private: 
        
        Connection* connection; 
        Session* session; 
        Topic* destination; 
        MessageConsumer* consumer; 
        long waitMillis; 
                
public: 

        HelloWorldConsumer( long waitMillis ){ 
                connection = NULL; 
    session = NULL; 
    destination = NULL; 
    consumer = NULL; 
    this->waitMillis = waitMillis; 
        } 
    virtual ~HelloWorldConsumer(){     
    cleanup(); 
    } 
    
    virtual void run() { 
        
        try { 

                        string user,passwd,sID; 
                        user="default"; 
                        passwd=""; 
                        sID="lsgID"; 
            // Create a ConnectionFactory 
            ActiveMQConnectionFactory* connectionFactory = 
                new ActiveMQConnectionFactory(
"tcp://localhost:61613",user,passwd,sID); 

            // Create a Connection 
            connection =
connectionFactory->createConnection();//user,passwd,sID); 
            delete connectionFactory; 
            connection->start(); 
            
            connection->setExceptionListener(this); 

            // Create a Session 
            session = connection->createSession( Session::AUTO_ACKNOWLEDGE
); 
                        destination = session->createTopic( "mytopic" ); 
                        consumer = session->createDurableConsumer(
destination , user , "",false); 
            
            consumer->setMessageListener( this ); 
            
            Thread::sleep( waitMillis ); 
            
        } catch (CMSException& e) { 
            e.printStackTrace(); 
        } 
    } 
    
    virtual void onMessage( const Message* message ){ 
    
        try 
        { 
       const TextMessage* textMessage = 
                dynamic_cast< const TextMessage* >( message ); 
            string text = textMessage->getText(); 
            printf( "Received: %s\n", text.c_str() ); 
        } catch (CMSException& e) { 
            e.printStackTrace(); 
        } 
    } 

    virtual void onException( const CMSException& ex ) { 
        printf("JMS Exception occured.  Shutting down client.\n"); 
    } 
    
private: 

    void cleanup(){ 
    
                // Destroy resources. 
                try{                         
        if( destination != NULL ) delete destination; 
                }catch (CMSException& e) {} 
                destination = NULL; 
                
                try{ 
            if( consumer != NULL ) delete consumer; 
                }catch (CMSException& e) {} 
                consumer = NULL; 
                
                // Close open resources. 
                try{ 
                        if( session != NULL ) session->close(); 
                        if( connection != NULL ) connection->close(); 
                }catch (CMSException& e) {} 
                
        try{ 
        if( session != NULL ) delete session; 
                }catch (CMSException& e) {} 
                session = NULL; 
                
                try{ 
        if( connection != NULL ) delete connection; 
                }catch (CMSException& e) {} 
                connection = NULL; 
    } 
}; 
  void Produce() 
 { 
     HelloWorldProducer producer( 2 ); 
     Thread producerThread( &producer ); 
     producerThread.start(); 
     producerThread.join(); 
 } 
 void Consumer1() 
 { 
      HelloWorldConsumer consumer( 10000 ); 
      Thread consumerThread( &consumer ); 
      consumerThread.start(); 
      consumerThread.join(); 
 } 
 void Consumer2() 
 { 
      HelloWorldConsumer consumer( 10000 ); 
      Thread consumerThread( &consumer ); 
      consumerThread.start(); 
      consumerThread.join(); 
 } 
 int main(int argc, char* argv[]) 
 { 
      Produce(); 
      Consumer1(); 
      Consumer2(); 
 }
-- 
View this message in context: http://www.nabble.com/SOS-tf2666164.html#a7435462
Sent from the ActiveMQ - User mailing list archive at Nabble.com.


Mime
View raw message