Return-Path: Delivered-To: apmail-activemq-dev-archive@www.apache.org Received: (qmail 58019 invoked from network); 9 Sep 2008 11:44:24 -0000 Received: from hermes.apache.org (HELO mail.apache.org) (140.211.11.2) by minotaur.apache.org with SMTP; 9 Sep 2008 11:44:24 -0000 Received: (qmail 21292 invoked by uid 500); 9 Sep 2008 11:44:22 -0000 Delivered-To: apmail-activemq-dev-archive@activemq.apache.org Received: (qmail 21109 invoked by uid 500); 9 Sep 2008 11:44:21 -0000 Mailing-List: contact dev-help@activemq.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@activemq.apache.org Delivered-To: mailing list dev@activemq.apache.org Received: (qmail 21098 invoked by uid 99); 9 Sep 2008 11:44:21 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 09 Sep 2008 04:44:21 -0700 X-ASF-Spam-Status: No, hits=2.6 required=10.0 tests=DNS_FROM_OPENWHOIS,SPF_HELO_PASS,SPF_PASS,WHOIS_MYPRIVREG X-Spam-Check-By: apache.org Received-SPF: pass (athena.apache.org: domain of lists@nabble.com designates 216.139.236.158 as permitted sender) Received: from [216.139.236.158] (HELO kuber.nabble.com) (216.139.236.158) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 09 Sep 2008 11:43:22 +0000 Received: from isper.nabble.com ([192.168.236.156]) by kuber.nabble.com with esmtp (Exim 4.63) (envelope-from ) id 1Kd1dR-0003md-3j for dev@activemq.apache.org; Tue, 09 Sep 2008 04:43:53 -0700 Message-ID: <19390593.post@talk.nabble.com> Date: Tue, 9 Sep 2008 04:43:53 -0700 (PDT) From: crazy4venu To: dev@activemq.apache.org Subject: onMessage event not raised MIME-Version: 1.0 Content-Type: text/plain; charset=us-ascii Content-Transfer-Encoding: 7bit X-Nabble-From: crazy4venu@gmail.com X-Virus-Checked: Checked by ClamAV on apache.org Dear all, I am writing a trading client and i am unable to check the onMessage event not raising. i did the following process in creating and using acitvemq consumer and producer. try { // // Create a ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616"); connectionFactory->setUsername((char*)(LPCTSTR)usr);//xxxxx connectionFactory->setPassword((char*)(LPCTSTR)pswd);//xxxxxxx connection = connectionFactory->createConnection(); connection->start(); // // Create a Session session = connection->createSession( Session::AUTO_ACKNOWLEDGE ); destination = session->createQueue("ors-commands"); AfxMessageBox("session created by producer !"); // // Create a MessageProducer from the Session to the Topic or Queue producer = session->createProducer( destination ); producer->setDeliveryMode( DeliveryMode::NON_PERSISTENT ); }catch ( CMSException& e ) { e.printStackTrace(); MessageBox("ActiveMQConnectionFactory Error !"); } //AfxBeginThread(startApp,this); creating consumer ---------------------- std::string brokerURI = "tcp://localhost:61616"; bool useTopics = true; bool sessionTransacted = false; int numMessages = 20000; try { HelloWorldConsumer consumer( brokerURI, numMessages, useTopics, sessionTransacted ); // Start the consumer thread. Thread consumerThread( &consumer ); consumerThread.start(); // Wait for the consumer to indicate that its ready to go. consumer.waitUntilReady(); consumerThread.join(); AfxMessageBox("consumer waiting for the messages !"); } catch ( CMSException& e ) { e.printStackTrace(); AfxMessageBox("ActiveMQConnection Consumer Factory Error !"); } and consumer class is class HelloWorldConsumer : public ExceptionListener, public MessageListener, public Runnable { private: CountDownLatch latch; CountDownLatch doneLatch; Connection* connection; Session* session; Destination* destination; MessageConsumer* consumer; long waitMillis; bool useTopic; bool sessionTransacted; std::string brokerURI; public: HelloWorldConsumer( const std::string& brokerURI, long numMessages, bool useTopic = false, bool sessionTransacted = false, long waitMillis = 30000 ) : latch(1), doneLatch(numMessages){ this->connection = NULL; this->session = NULL; this->destination = NULL; this->consumer = NULL; this->waitMillis = waitMillis; this->useTopic = useTopic; this->sessionTransacted = sessionTransacted; this->brokerURI = brokerURI; } virtual ~HelloWorldConsumer(){ cleanup(); } void waitUntilReady() { latch.await(); } virtual void run() { ConnectionFactory* connectionFactory = NULL; try { // Create a ConnectionFactory connectionFactory = ConnectionFactory::createCMSConnectionFactory( brokerURI ); // Create a Connection connection = connectionFactory->createConnection(); delete connectionFactory; connectionFactory = NULL; connection->start(); connection->setExceptionListener(this); // Create a Session if( this->sessionTransacted == true ) { session = connection->createSession( Session::SESSION_TRANSACTED ); } else { session = connection->createSession( Session::AUTO_ACKNOWLEDGE ); } // Create the destination (Topic or Queue) if( useTopic ) { destination = session->createTopic( "ors_messages" ); } else { destination = session->createQueue( "ors_commands" ); } // Create a MessageConsumer from the Session to the Topic or Queue consumer = session->createConsumer( destination ); consumer->setMessageListener( this ); std::cout.flush(); std::cerr.flush(); // Indicate we are ready for messages. //latch.countDown(); // Wait while asynchronous messages come in. //doneLatch.await( waitMillis ); } catch (CMSException& e) { // Indicate we are ready for messages. latch.countDown(); delete connectionFactory; connectionFactory = NULL; e.printStackTrace(); } } // Called from the consumer since this class is a registered MessageListener. virtual void onMessage( const Message* message ){ static int count = 0; AfxMessageBox("on Messageeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee"); try { count++; const TextMessage* textMessage = dynamic_cast< const TextMessage* >( message ); string text = ""; if( textMessage != NULL ) { text = textMessage->getText(); } else { text = "NOT A TEXTMESSAGE!"; } // printf( "Message #%d Received: %s\n", count, text.c_str() ); AfxMessageBox(text.c_str()); } catch (CMSException& e) { e.printStackTrace(); } // Commit all messages. if( this->sessionTransacted ) { session->commit(); } // No matter what, tag the count down latch until done. doneLatch.countDown(); } // If something bad happens you see it here as this class is also been // registered as an ExceptionListener with the connection. virtual void onException( const CMSException& ex AMQCPP_UNUSED) { printf("CMS Exception occured. Shutting down client.\n"); exit(1); } private: void cleanup(){ //************************************************* // Always close destination, consumers and producers before // you destroy their sessions and connection. //************************************************* // Destroy resources. try{ if( destination != NULL ) delete destination; }catch (CMSException& e) { e.printStackTrace(); } destination = NULL; try{ if( consumer != NULL ) delete consumer; }catch (CMSException& e) { e.printStackTrace(); } consumer = NULL; // Close open resources. try{ if( session != NULL ) session->close(); if( connection != NULL ) connection->close(); }catch (CMSException& e) { e.printStackTrace(); } // Now Destroy them try{ if( session != NULL ) delete session; }catch (CMSException& e) { e.printStackTrace(); } session = NULL; try{ if( connection != NULL ) delete connection; }catch (CMSException& e) { e.printStackTrace(); } connection = NULL; } }; help me in this issue. Regards venu -- View this message in context: http://www.nabble.com/onMessage-event-not-raised-tp19390593p19390593.html Sent from the ActiveMQ - Dev mailing list archive at Nabble.com.