activemq-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From crazy4venu <crazy4v...@gmail.com>
Subject Re: onMessage event not raised
Date Tue, 09 Sep 2008 13:05:11 GMT

Hello Tim,

Thanks for your early reply and great support.

and the provided sample application works for me and i am able to send and
receive messages.

coming to mycode 

1. connecting to Marketcetera JMS server (on tcp://<machine>:61616 by
default).
2. creating a FIX message, droping it on the oms-commands queue.
3. Listen for execution reports on the oms-messages topic. 

regards
venu

Timothy Bish wrote:
> 
> Its a bit hard to follow exactly what's been changed in the sample code
> here but it almost looks like you are producing on a queue but
> attempting to receive on a topic.  
> 
> Does the provided sample application that comes with activemq-cpp work
> for you out of the box?
> 
> Regards
> Tim.
> 
> On Tue, 2008-09-09 at 04:43 -0700, crazy4venu wrote:
>> Dear all, 
>> 
>> I am writing a trading client and i am unable to check the onMessage
>> event
>> not raising.
>> 
>> i did the following process in creating and using acitvemq consumer and
>> producer.
>> 
>>     try {
>>                 // // Create a ConnectionFactory
>>                 connectionFactory = new
>> ActiveMQConnectionFactory("tcp://localhost:61616");
>> 
>>                
>> connectionFactory->setUsername((char*)(LPCTSTR)usr);//xxxxx
>>                
>> connectionFactory->setPassword((char*)(LPCTSTR)pswd);//xxxxxxx
>> 
>>                 connection = connectionFactory->createConnection();
>>                 connection->start();
>> 
>>                 // // Create a Session
>>                 session = connection->createSession(
>> Session::AUTO_ACKNOWLEDGE );
>>                 destination = session->createQueue("ors-commands");
>> 
>>                 AfxMessageBox("session created by producer !");
>>                 //
>>                 // Create a MessageProducer from the Session to the Topic
>> or
>> Queue
>>                 producer = session->createProducer( destination );
>>                 producer->setDeliveryMode( DeliveryMode::NON_PERSISTENT
>> );
>>         }catch ( CMSException& e )
>>         {
>>                 e.printStackTrace();
>>                 MessageBox("ActiveMQConnectionFactory Error !");
>>         }
>>                 //AfxBeginThread(startApp,this);
>> 
>> 
>> 
>> 
>> creating consumer
>> ----------------------
>>         std::string brokerURI = "tcp://localhost:61616";
>>                
>>         bool useTopics = true;
>>         bool sessionTransacted = false;
>>         int numMessages = 20000;
>> 
>> try {
>> 
>>         HelloWorldConsumer consumer( brokerURI, numMessages, useTopics,
>> sessionTransacted );
>> 
>>         // Start the consumer thread.
>>         Thread consumerThread( &consumer );
>>         consumerThread.start();
>> 
>>         // Wait for the consumer to indicate that its ready to go.
>>         consumer.waitUntilReady();
>>         consumerThread.join();
>> 
>> 
>>         AfxMessageBox("consumer waiting for the messages !");
>> }
>> catch ( CMSException& e )
>> {
>> e.printStackTrace();
>> AfxMessageBox("ActiveMQConnection Consumer Factory Error !");
>> }
>> 
>> 
>> and consumer class is 
>> 
>> 
>> class HelloWorldConsumer : public ExceptionListener,
>>  public MessageListener,
>>  public Runnable {
>> 
>> private:
>> 
>>  CountDownLatch latch;
>>  CountDownLatch doneLatch;
>>  Connection* connection;
>>  Session* session;
>>  Destination* destination;
>>  MessageConsumer* consumer;
>>  long waitMillis;
>>  bool useTopic;
>>  bool sessionTransacted;
>>  std::string brokerURI;
>> 
>> public:
>> 
>>  HelloWorldConsumer( const std::string& brokerURI,
>>   long numMessages,
>>   bool useTopic = false,
>>   bool sessionTransacted = false,
>>   long waitMillis = 30000 )
>>   : latch(1), doneLatch(numMessages){
>>    this->connection = NULL;
>>    this->session = NULL;
>>    this->destination = NULL;
>>    this->consumer = NULL;
>>    this->waitMillis = waitMillis;
>>    this->useTopic = useTopic;
>>    this->sessionTransacted = sessionTransacted;
>>    this->brokerURI = brokerURI;
>>  }
>>  virtual ~HelloWorldConsumer(){
>>   cleanup();
>>  }
>> 
>>  void waitUntilReady() {
>>   latch.await();
>>  }
>> 
>>  virtual void run() {
>> 
>>   ConnectionFactory* connectionFactory = NULL;
>> 
>>   try {
>> 
>>    // Create a ConnectionFactory
>>    connectionFactory =
>>     ConnectionFactory::createCMSConnectionFactory( brokerURI );
>> 
>>    // Create a Connection
>>    connection = connectionFactory->createConnection();
>> 
>>    delete connectionFactory;
>>    connectionFactory = NULL;
>> 
>>    connection->start();
>> 
>>    connection->setExceptionListener(this);
>> 
>>    // Create a Session
>>    if( this->sessionTransacted == true ) {
>>     session = connection->createSession( Session::SESSION_TRANSACTED );
>>    } else {
>>     session = connection->createSession( Session::AUTO_ACKNOWLEDGE );
>>    }
>> 
>>    // Create the destination (Topic or Queue)
>>    if( useTopic ) {
>>     destination = session->createTopic( "ors_messages" );
>>    } else {
>>     destination = session->createQueue( "ors_commands" );
>>    }
>> 
>>    // Create a MessageConsumer from the Session to the Topic or Queue
>>    consumer = session->createConsumer( destination );
>> 
>>    consumer->setMessageListener( this );
>> 
>>    std::cout.flush();
>>    std::cerr.flush();
>> 
>>    // Indicate we are ready for messages.
>>    //latch.countDown();
>> 
>>    // Wait while asynchronous messages come in.
>>    //doneLatch.await( waitMillis );
>> 
>>   } catch (CMSException& e) {
>> 
>>    // Indicate we are ready for messages.
>>    latch.countDown();
>> 
>>    delete connectionFactory;
>>    connectionFactory = NULL;
>> 
>>    e.printStackTrace();
>>   }
>>  }
>> 
>>  // Called from the consumer since this class is a registered
>> MessageListener.
>>  virtual void onMessage( const Message* message ){
>> 
>>   static int count = 0;
>> 
>>   AfxMessageBox("on Messageeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee");
>> 
>>   try
>>   {
>>    count++;
>>    const TextMessage* textMessage =
>>     dynamic_cast< const TextMessage* >( message );
>>    string text = "";
>> 
>>    if( textMessage != NULL ) {
>>     text = textMessage->getText();
>>    } else {
>>     text = "NOT A TEXTMESSAGE!";
>>    }
>> 
>>   // printf( "Message #%d Received: %s\n", count, text.c_str() );
>>    AfxMessageBox(text.c_str());
>> 
>>   } catch (CMSException& e) {
>>    e.printStackTrace();
>>   }
>> 
>>   // Commit all messages.
>>   if( this->sessionTransacted ) {
>>    session->commit();
>>   }
>> 
>>   // No matter what, tag the count down latch until done.
>>   doneLatch.countDown();
>>  }
>> 
>>  // If something bad happens you see it here as this class is also been
>>  // registered as an ExceptionListener with the connection.
>>  virtual void onException( const CMSException& ex AMQCPP_UNUSED) {
>>   printf("CMS Exception occured.  Shutting down client.\n");
>>   exit(1);
>>  }
>> 
>> private:
>> 
>>  void cleanup(){
>> 
>>   //*************************************************
>>   // Always close destination, consumers and producers before
>>   // you destroy their sessions and connection.
>>   //*************************************************
>> 
>>   // Destroy resources.
>>   try{
>>    if( destination != NULL ) delete destination;
>>   }catch (CMSException& e) { e.printStackTrace(); }
>>   destination = NULL;
>> 
>>   try{
>>    if( consumer != NULL ) delete consumer;
>>   }catch (CMSException& e) { e.printStackTrace(); }
>>   consumer = NULL;
>> 
>>   // Close open resources.
>>   try{
>>    if( session != NULL ) session->close();
>>    if( connection != NULL ) connection->close();
>>   }catch (CMSException& e) { e.printStackTrace(); }
>> 
>>   // Now Destroy them
>>   try{
>>    if( session != NULL ) delete session;
>>   }catch (CMSException& e) { e.printStackTrace(); }
>>   session = NULL;
>> 
>>   try{
>>    if( connection != NULL ) delete connection;
>>   }catch (CMSException& e) { e.printStackTrace(); }
>>   connection = NULL;
>>  }
>> };
>> 
>> 
>> help me in this issue.
>> 
>> Regards
>> venu
> 
> 
> 

-- 
View this message in context: http://www.nabble.com/onMessage-event-not-raised-tp19390593p19391920.html
Sent from the ActiveMQ - Dev mailing list archive at Nabble.com.


Mime
View raw message