activemq-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Timothy Bish <tabish...@gmail.com>
Subject Re: onMessage event not raised
Date Tue, 09 Sep 2008 12:16:54 GMT
Its a bit hard to follow exactly what's been changed in the sample code
here but it almost looks like you are producing on a queue but
attempting to receive on a topic.  

Does the provided sample application that comes with activemq-cpp work
for you out of the box?

Regards
Tim.

On Tue, 2008-09-09 at 04:43 -0700, crazy4venu wrote:
> Dear all, 
> 
> I am writing a trading client and i am unable to check the onMessage event
> not raising.
> 
> i did the following process in creating and using acitvemq consumer and
> producer.
> 
>     try {
>                 // // Create a ConnectionFactory
>                 connectionFactory = new
> ActiveMQConnectionFactory("tcp://localhost:61616");
> 
>                 connectionFactory->setUsername((char*)(LPCTSTR)usr);//xxxxx
>                
> connectionFactory->setPassword((char*)(LPCTSTR)pswd);//xxxxxxx
> 
>                 connection = connectionFactory->createConnection();
>                 connection->start();
> 
>                 // // Create a Session
>                 session = connection->createSession(
> Session::AUTO_ACKNOWLEDGE );
>                 destination = session->createQueue("ors-commands");
> 
>                 AfxMessageBox("session created by producer !");
>                 //
>                 // Create a MessageProducer from the Session to the Topic or
> Queue
>                 producer = session->createProducer( destination );
>                 producer->setDeliveryMode( DeliveryMode::NON_PERSISTENT );
>         }catch ( CMSException& e )
>         {
>                 e.printStackTrace();
>                 MessageBox("ActiveMQConnectionFactory Error !");
>         }
>                 //AfxBeginThread(startApp,this);
> 
> 
> 
> 
> creating consumer
> ----------------------
>         std::string brokerURI = "tcp://localhost:61616";
>                
>         bool useTopics = true;
>         bool sessionTransacted = false;
>         int numMessages = 20000;
> 
> try {
> 
>         HelloWorldConsumer consumer( brokerURI, numMessages, useTopics,
> sessionTransacted );
> 
>         // Start the consumer thread.
>         Thread consumerThread( &consumer );
>         consumerThread.start();
> 
>         // Wait for the consumer to indicate that its ready to go.
>         consumer.waitUntilReady();
>         consumerThread.join();
> 
> 
>         AfxMessageBox("consumer waiting for the messages !");
> }
> catch ( CMSException& e )
> {
> e.printStackTrace();
> AfxMessageBox("ActiveMQConnection Consumer Factory Error !");
> }
> 
> 
> and consumer class is 
> 
> 
> class HelloWorldConsumer : public ExceptionListener,
>  public MessageListener,
>  public Runnable {
> 
> private:
> 
>  CountDownLatch latch;
>  CountDownLatch doneLatch;
>  Connection* connection;
>  Session* session;
>  Destination* destination;
>  MessageConsumer* consumer;
>  long waitMillis;
>  bool useTopic;
>  bool sessionTransacted;
>  std::string brokerURI;
> 
> public:
> 
>  HelloWorldConsumer( const std::string& brokerURI,
>   long numMessages,
>   bool useTopic = false,
>   bool sessionTransacted = false,
>   long waitMillis = 30000 )
>   : latch(1), doneLatch(numMessages){
>    this->connection = NULL;
>    this->session = NULL;
>    this->destination = NULL;
>    this->consumer = NULL;
>    this->waitMillis = waitMillis;
>    this->useTopic = useTopic;
>    this->sessionTransacted = sessionTransacted;
>    this->brokerURI = brokerURI;
>  }
>  virtual ~HelloWorldConsumer(){
>   cleanup();
>  }
> 
>  void waitUntilReady() {
>   latch.await();
>  }
> 
>  virtual void run() {
> 
>   ConnectionFactory* connectionFactory = NULL;
> 
>   try {
> 
>    // Create a ConnectionFactory
>    connectionFactory =
>     ConnectionFactory::createCMSConnectionFactory( brokerURI );
> 
>    // Create a Connection
>    connection = connectionFactory->createConnection();
> 
>    delete connectionFactory;
>    connectionFactory = NULL;
> 
>    connection->start();
> 
>    connection->setExceptionListener(this);
> 
>    // Create a Session
>    if( this->sessionTransacted == true ) {
>     session = connection->createSession( Session::SESSION_TRANSACTED );
>    } else {
>     session = connection->createSession( Session::AUTO_ACKNOWLEDGE );
>    }
> 
>    // Create the destination (Topic or Queue)
>    if( useTopic ) {
>     destination = session->createTopic( "ors_messages" );
>    } else {
>     destination = session->createQueue( "ors_commands" );
>    }
> 
>    // Create a MessageConsumer from the Session to the Topic or Queue
>    consumer = session->createConsumer( destination );
> 
>    consumer->setMessageListener( this );
> 
>    std::cout.flush();
>    std::cerr.flush();
> 
>    // Indicate we are ready for messages.
>    //latch.countDown();
> 
>    // Wait while asynchronous messages come in.
>    //doneLatch.await( waitMillis );
> 
>   } catch (CMSException& e) {
> 
>    // Indicate we are ready for messages.
>    latch.countDown();
> 
>    delete connectionFactory;
>    connectionFactory = NULL;
> 
>    e.printStackTrace();
>   }
>  }
> 
>  // Called from the consumer since this class is a registered
> MessageListener.
>  virtual void onMessage( const Message* message ){
> 
>   static int count = 0;
> 
>   AfxMessageBox("on Messageeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee");
> 
>   try
>   {
>    count++;
>    const TextMessage* textMessage =
>     dynamic_cast< const TextMessage* >( message );
>    string text = "";
> 
>    if( textMessage != NULL ) {
>     text = textMessage->getText();
>    } else {
>     text = "NOT A TEXTMESSAGE!";
>    }
> 
>   // printf( "Message #%d Received: %s\n", count, text.c_str() );
>    AfxMessageBox(text.c_str());
> 
>   } catch (CMSException& e) {
>    e.printStackTrace();
>   }
> 
>   // Commit all messages.
>   if( this->sessionTransacted ) {
>    session->commit();
>   }
> 
>   // No matter what, tag the count down latch until done.
>   doneLatch.countDown();
>  }
> 
>  // If something bad happens you see it here as this class is also been
>  // registered as an ExceptionListener with the connection.
>  virtual void onException( const CMSException& ex AMQCPP_UNUSED) {
>   printf("CMS Exception occured.  Shutting down client.\n");
>   exit(1);
>  }
> 
> private:
> 
>  void cleanup(){
> 
>   //*************************************************
>   // Always close destination, consumers and producers before
>   // you destroy their sessions and connection.
>   //*************************************************
> 
>   // Destroy resources.
>   try{
>    if( destination != NULL ) delete destination;
>   }catch (CMSException& e) { e.printStackTrace(); }
>   destination = NULL;
> 
>   try{
>    if( consumer != NULL ) delete consumer;
>   }catch (CMSException& e) { e.printStackTrace(); }
>   consumer = NULL;
> 
>   // Close open resources.
>   try{
>    if( session != NULL ) session->close();
>    if( connection != NULL ) connection->close();
>   }catch (CMSException& e) { e.printStackTrace(); }
> 
>   // Now Destroy them
>   try{
>    if( session != NULL ) delete session;
>   }catch (CMSException& e) { e.printStackTrace(); }
>   session = NULL;
> 
>   try{
>    if( connection != NULL ) delete connection;
>   }catch (CMSException& e) { e.printStackTrace(); }
>   connection = NULL;
>  }
> };
> 
> 
> help me in this issue.
> 
> Regards
> venu


Mime
View raw message