activemq-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From crazy4venu <crazy4v...@gmail.com>
Subject onMessage event not raised
Date Tue, 09 Sep 2008 11:43:53 GMT

Dear all, 

I am writing a trading client and i am unable to check the onMessage event
not raising.

i did the following process in creating and using acitvemq consumer and
producer.

    try {
                // // Create a ConnectionFactory
                connectionFactory = new
ActiveMQConnectionFactory("tcp://localhost:61616");

                connectionFactory->setUsername((char*)(LPCTSTR)usr);//xxxxx
               
connectionFactory->setPassword((char*)(LPCTSTR)pswd);//xxxxxxx

                connection = connectionFactory->createConnection();
                connection->start();

                // // Create a Session
                session = connection->createSession(
Session::AUTO_ACKNOWLEDGE );
                destination = session->createQueue("ors-commands");

                AfxMessageBox("session created by producer !");
                //
                // Create a MessageProducer from the Session to the Topic or
Queue
                producer = session->createProducer( destination );
                producer->setDeliveryMode( DeliveryMode::NON_PERSISTENT );
        }catch ( CMSException& e )
        {
                e.printStackTrace();
                MessageBox("ActiveMQConnectionFactory Error !");
        }
                //AfxBeginThread(startApp,this);




creating consumer
----------------------
        std::string brokerURI = "tcp://localhost:61616";
               
        bool useTopics = true;
        bool sessionTransacted = false;
        int numMessages = 20000;

try {

        HelloWorldConsumer consumer( brokerURI, numMessages, useTopics,
sessionTransacted );

        // Start the consumer thread.
        Thread consumerThread( &consumer );
        consumerThread.start();

        // Wait for the consumer to indicate that its ready to go.
        consumer.waitUntilReady();
        consumerThread.join();


        AfxMessageBox("consumer waiting for the messages !");
}
catch ( CMSException& e )
{
e.printStackTrace();
AfxMessageBox("ActiveMQConnection Consumer Factory Error !");
}


and consumer class is 


class HelloWorldConsumer : public ExceptionListener,
 public MessageListener,
 public Runnable {

private:

 CountDownLatch latch;
 CountDownLatch doneLatch;
 Connection* connection;
 Session* session;
 Destination* destination;
 MessageConsumer* consumer;
 long waitMillis;
 bool useTopic;
 bool sessionTransacted;
 std::string brokerURI;

public:

 HelloWorldConsumer( const std::string& brokerURI,
  long numMessages,
  bool useTopic = false,
  bool sessionTransacted = false,
  long waitMillis = 30000 )
  : latch(1), doneLatch(numMessages){
   this->connection = NULL;
   this->session = NULL;
   this->destination = NULL;
   this->consumer = NULL;
   this->waitMillis = waitMillis;
   this->useTopic = useTopic;
   this->sessionTransacted = sessionTransacted;
   this->brokerURI = brokerURI;
 }
 virtual ~HelloWorldConsumer(){
  cleanup();
 }

 void waitUntilReady() {
  latch.await();
 }

 virtual void run() {

  ConnectionFactory* connectionFactory = NULL;

  try {

   // Create a ConnectionFactory
   connectionFactory =
    ConnectionFactory::createCMSConnectionFactory( brokerURI );

   // Create a Connection
   connection = connectionFactory->createConnection();

   delete connectionFactory;
   connectionFactory = NULL;

   connection->start();

   connection->setExceptionListener(this);

   // Create a Session
   if( this->sessionTransacted == true ) {
    session = connection->createSession( Session::SESSION_TRANSACTED );
   } else {
    session = connection->createSession( Session::AUTO_ACKNOWLEDGE );
   }

   // Create the destination (Topic or Queue)
   if( useTopic ) {
    destination = session->createTopic( "ors_messages" );
   } else {
    destination = session->createQueue( "ors_commands" );
   }

   // Create a MessageConsumer from the Session to the Topic or Queue
   consumer = session->createConsumer( destination );

   consumer->setMessageListener( this );

   std::cout.flush();
   std::cerr.flush();

   // Indicate we are ready for messages.
   //latch.countDown();

   // Wait while asynchronous messages come in.
   //doneLatch.await( waitMillis );

  } catch (CMSException& e) {

   // Indicate we are ready for messages.
   latch.countDown();

   delete connectionFactory;
   connectionFactory = NULL;

   e.printStackTrace();
  }
 }

 // Called from the consumer since this class is a registered
MessageListener.
 virtual void onMessage( const Message* message ){

  static int count = 0;

  AfxMessageBox("on Messageeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee");

  try
  {
   count++;
   const TextMessage* textMessage =
    dynamic_cast< const TextMessage* >( message );
   string text = "";

   if( textMessage != NULL ) {
    text = textMessage->getText();
   } else {
    text = "NOT A TEXTMESSAGE!";
   }

  // printf( "Message #%d Received: %s\n", count, text.c_str() );
   AfxMessageBox(text.c_str());

  } catch (CMSException& e) {
   e.printStackTrace();
  }

  // Commit all messages.
  if( this->sessionTransacted ) {
   session->commit();
  }

  // No matter what, tag the count down latch until done.
  doneLatch.countDown();
 }

 // If something bad happens you see it here as this class is also been
 // registered as an ExceptionListener with the connection.
 virtual void onException( const CMSException& ex AMQCPP_UNUSED) {
  printf("CMS Exception occured.  Shutting down client.\n");
  exit(1);
 }

private:

 void cleanup(){

  //*************************************************
  // Always close destination, consumers and producers before
  // you destroy their sessions and connection.
  //*************************************************

  // Destroy resources.
  try{
   if( destination != NULL ) delete destination;
  }catch (CMSException& e) { e.printStackTrace(); }
  destination = NULL;

  try{
   if( consumer != NULL ) delete consumer;
  }catch (CMSException& e) { e.printStackTrace(); }
  consumer = NULL;

  // Close open resources.
  try{
   if( session != NULL ) session->close();
   if( connection != NULL ) connection->close();
  }catch (CMSException& e) { e.printStackTrace(); }

  // Now Destroy them
  try{
   if( session != NULL ) delete session;
  }catch (CMSException& e) { e.printStackTrace(); }
  session = NULL;

  try{
   if( connection != NULL ) delete connection;
  }catch (CMSException& e) { e.printStackTrace(); }
  connection = NULL;
 }
};


help me in this issue.

Regards
venu
-- 
View this message in context: http://www.nabble.com/onMessage-event-not-raised-tp19390593p19390593.html
Sent from the ActiveMQ - Dev mailing list archive at Nabble.com.


Mime
View raw message