activemq-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Timothy Bish <tabish...@gmail.com>
Subject Re: ActiveMQ c++ and Java
Date Mon, 31 Aug 2009 14:22:27 GMT
I've added my comments inline.

On Sat, 2009-08-29 at 09:51 -0700, moonbird wrote:
> this is my java application:
> ...
> // Create connection. Create session from connection; false means
>         // session is not transacted. Create requestor and text message.
> Send
>         // messages, wait for answer and finally close session and
> connection.
> 		try
> 		{
> 			queueConnection = queueConnectionFactory.createQueueConnection();

Try adding in a call to queueConnection.start() here, should solve your
problem.


> 			queueSession = queueConnection.createQueueSession(false,
> Session.AUTO_ACKNOWLEDGE);
> 			// obviously this is needed ??? why doesnt it work to lookup the activemq
> queue via jndi ?
> 			stationInfoQueue = queueSession.createQueue("stationInfoQueue");
> 			textMessage = queueSession.createTextMessage(myXMLRequestAsString);
> 			// javax.jms.QueueRequestor creates a TemporaryQueue for the responses
> and provides a request method that sends the request message 
> 			// and waits for its reply.This is a basic request/reply abstraction that
> should be sufficient for most uses. 
> 			// JMS providers and clients are free to create more sophisticated
> versions.
> 			queueRequestor = new QueueRequestor(queueSession, stationInfoQueue);
> 			//sends the message and waits until respond is received
> 			TextMessage answer = (TextMessage) queueRequestor.request(textMessage);
> 			System.out.println("CLIENT: Response message received: ");
> 			System.out.println(answer.getText());
> 		}
> 		catch (JMSException e)
> 		{
> 			System.out.println("JMSExceptionn occurred:" + e);
> 		}
> ...
> 
> and this is my c++ application (it is a modified version of the apache
> example SimpleAsyncConsumer.cpp):
> 
> /*
>  * Licensed to the Apache Software Foundation (ASF) under one or more
>  * contributor license agreements.  See the NOTICE file distributed with
>  * this work for additional information regarding copyright ownership.
>  * The ASF licenses this file to You under the Apache License, Version 2.0
>  * (the "License"); you may not use this file except in compliance with
>  * the License.  You may obtain a copy of the License at
>  *
>  *     http://www.apache.org/licenses/LICENSE-2.0
>  *
>  * Unless required by applicable law or agreed to in writing, software
>  * distributed under the License is distributed on an "AS IS" BASIS,
>  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
>  * See the License for the specific language governing permissions and
>  * limitations under the License.
>  */
> 
> #include <decaf/lang/Thread.h>
> #include <decaf/lang/Runnable.h>
> #include <decaf/util/concurrent/CountDownLatch.h>
> #include <activemq/core/ActiveMQConnectionFactory.h>
> #include <activemq/library/ActiveMQCPP.h>
> #include <decaf/lang/Integer.h>
> #include <activemq/util/Config.h>
> #include <decaf/util/Date.h>
> #include <cms/Connection.h>
> #include <cms/Session.h>
> #include <cms/TextMessage.h>
> #include <cms/BytesMessage.h>
> #include <cms/MapMessage.h>
> #include <cms/ExceptionListener.h>
> #include <cms/MessageListener.h>
> #include <stdlib.h>
> #include <iostream>
> 
> using namespace activemq;
> using namespace activemq::core;
> using namespace decaf::lang;
> using namespace decaf::util;
> using namespace decaf::util::concurrent;
> using namespace cms;
> using namespace std;
> 
> ////////////////////////////////////////////////////////////////////////////////
> class SimpleAsyncConsumer : public ExceptionListener,
>                             public MessageListener {
> private:
> 
>     Connection* connection;
>     Session* session;
>     Destination* destination;
>     MessageConsumer* consumer;
>     bool useTopic;
>     bool clientAck;
>     std::string brokerURI;
>     std::string destURI;
> 
> public:
> 
>     SimpleAsyncConsumer( const std::string& brokerURI,
>                          const std::string& destURI,
>                          bool useTopic = false,
>                          bool clientAck = false ) {
>         connection = NULL;
>         session = NULL;
>         destination = NULL;
>         consumer = NULL;
>         this->useTopic = useTopic;
>         this->brokerURI = brokerURI;
>         this->destURI = destURI;
>         this->clientAck = clientAck;
>     }
> 
>     virtual ~SimpleAsyncConsumer(){
>         cleanup();
>     }
> 
>     void runConsumer() {
> 
>         try {
> 
>             // Create a ConnectionFactory
>             ActiveMQConnectionFactory* connectionFactory =
>                 new ActiveMQConnectionFactory( brokerURI );
> 
>             // Create a Connection
>             connection = connectionFactory->createConnection();
>             delete connectionFactory;
>             connection->start();
> 
>             connection->setExceptionListener(this);
> 
>             // Create a Session
>             if( clientAck ) {
>                 session = connection->createSession(
> Session::CLIENT_ACKNOWLEDGE );
>             } else {
>                 session = connection->createSession(
> Session::AUTO_ACKNOWLEDGE );
>             }
> 
>             // Create the destination (Topic or Queue)
>             if( useTopic ) {
>                 destination = session->createTopic( destURI );
>             } else {
>                 destination = session->createQueue( destURI );
>             }
> 
>             // Create a MessageConsumer from the Session to the Topic or
> Queue
>             consumer = session->createConsumer( destination );
>             consumer->setMessageListener( this );
> 
>         } catch (CMSException& e) {
>             e.printStackTrace();
>         }
>     }
> 
>     // Called from the consumer since this class is a registered
> MessageListener.
>     virtual void onMessage( const Message* message ){
> 
>         static int count = 0;
> 
>         try
>         {
>             count++;
>             const TextMessage* textMessage =
>                 dynamic_cast< const TextMessage* >( message );
>             string text = "";
> 
>             if( textMessage != NULL ) 
>             {
>                 text = textMessage->getText();
>             } 
>             else 
>             {
>                 text = "NOT A TEXTMESSAGE!";
>             }
> 
>             if( clientAck ) 
>             {
>                 message->acknowledge();
>             }
> 
>             printf( "Message #%d Received: %s\n", count, text.c_str() );
>             
>             //sendReply(textMessage);
>         } 
>         catch (CMSException& e) 
>         {
>             e.printStackTrace();
>         }
>         sendReply(message);
>     }
> 
>     // If something bad happens you see it here as this class is also been
>     // registered as an ExceptionListener with the connection.
>     virtual void onException( const CMSException& ex AMQCPP_UNUSED) {
>         printf("CMS Exception occurred.  Shutting down client.\n");
>         exit(1);
>     }
> 

This method causes several memory leaks that you should address,
overwriting the session, connection and destination pointers for the
conusmer class is probably not what you should be doing here.  Also
there really isn't a need to create an entirely new connection and
session here, just reuse the exiting ones.

> 	void sendReply(const Message* msg)
> 	{
> 		printf( "started sendrReply()");
> 		
> 		try 
> 		{
>             // Create a ConnectionFactory
>             auto_ptr<ActiveMQConnectionFactory> connectionFactory(
>                 new ActiveMQConnectionFactory( brokerURI ) );
> 
>             try
>             {
>             	connection = connectionFactory->createConnection();
>                     connection->start();
>             }
>             catch( CMSException& e ) 
>             {
>                     e.printStackTrace();
>             }
>             // Create a Session
>             session = connection->createSession( Session::AUTO_ACKNOWLEDGE
> );
>             
> 
>             // Create the destination (Topic or Queue)

This should be something more along the lines of

std::auto_ptr<cms::Destination>
responseDestination( msg->getCMSReplyTo()->clone() );

>             destination = const_cast<Destination*>(msg->getCMSReplyTo());
>             
> 
>             // Create a MessageProducer from the Session to the Topic or
> Queue

You never delete this producer, another leak.

>             MessageProducer* producer = session->createProducer( destination
> );
>             producer->setDeliveryMode( DeliveryMode::NON_PERSISTENT );
> 
>             // Create a messages
>             string text = (string)"Hello World back";
>             TextMessage* message = session->createTextMessage( text );
> 
>         
>             // Tell the producer to send the message
>             producer->send( message );
> 
>             delete message;
>         
> 
>         }
>         catch ( CMSException& e ) {
>             e.printStackTrace();
>         }
> 	}
> 	
> private:
> 
>     void cleanup(){
> 
>         //*************************************************
>         // Always close destination, consumers and producers before
>         // you destroy their sessions and connection.
>         //*************************************************
> 
>         // Destroy resources.
>         try{
>             if( destination != NULL ) delete destination;
>         }catch (CMSException& e) { e.printStackTrace(); }
>         destination = NULL;
> 
>         try{
>             if( consumer != NULL ) delete consumer;
>         }catch (CMSException& e) { e.printStackTrace(); }
>         consumer = NULL;
> 
>         // Close open resources.
>         try{
>             if( session != NULL ) session->close();
>             if( connection != NULL ) connection->close();
>         }catch (CMSException& e) { e.printStackTrace(); }
> 
>         // Now Destroy them
>         try{
>             if( session != NULL ) delete session;
>         }catch (CMSException& e) { e.printStackTrace(); }
>         session = NULL;
> 
>         try{
>             if( connection != NULL ) delete connection;
>         }catch (CMSException& e) { e.printStackTrace(); }
>         connection = NULL;
>     }
> };
> 
> ////////////////////////////////////////////////////////////////////////////////
> int main(int argc AMQCPP_UNUSED, char* argv[] AMQCPP_UNUSED) {
> 
>     activemq::library::ActiveMQCPP::initializeLibrary();
> 
>     std::cout << "=====================================================\n";
>     std::cout << "Starting the example:" << std::endl;
>     std::cout << "-----------------------------------------------------\n";
> 
>     // Set the URI to point to the IPAddress of your broker.
>     // add any optional params to the url to enable things like
>     // tightMarshalling or tcp logging etc.  See the CMS web site for
>     // a full list of configuration options.
>     //
>     //  http://activemq.apache.org/cms/
>     //
>     // Wire Format Options:
>     // =====================
>     // Use either stomp or openwire, the default ports are different for
> each
>     //
>     // Examples:
>     //    tcp://127.0.0.1:61616                      default to openwire
>     //    tcp://127.0.0.1:61616?wireFormat=openwire  same as above
>     //    tcp://127.0.0.1:61613?wireFormat=stomp     use stomp instead
>     //
>     std::string brokerURI =
>         "failover:(tcp://127.0.0.1:61616"
> //        "?wireFormat=openwire"
> //        "&connection.useAsyncSend=true"
> //        "&transport.commandTracingEnabled=true"
> //        "&transport.tcpTracingEnabled=true"
> //        "&wireFormat.tightEncodingEnabled=true"
>         ")";
> 
>     //============================================================
>     // This is the Destination Name and URI options.  Use this to
>     // customize where the consumer listens, to have the consumer
>     // use a topic or queue set the 'useTopics' flag.
>     //============================================================
>     //std::string destURI = "TEST.FOO"; //?consumer.prefetchSize=1";
>     std::string destURI = "stationInfoQueue"; //?consumer.prefetchSize=1";
> 
> 
>     //============================================================
>     // set to true to use topics instead of queues
>     // Note in the code above that this causes createTopic or
>     // createQueue to be used in the consumer.
>     //============================================================
>     bool useTopics = false;
> 
>     //============================================================
>     // set to true if you want the consumer to use client ack mode
>     // instead of the default auto ack mode.
>     //============================================================
>     bool clientAck = false;
> 
>     // Create the consumer
>     SimpleAsyncConsumer consumer( brokerURI, destURI, useTopics, clientAck
> );
> 
>     // Start it up and it will listen forever.
>     consumer.runConsumer();
> 
>     // Wait to exit.
>     std::cout << "Press 'q' to quit" << std::endl;
>     while( std::cin.get() != 'q') {}
> 
>     std::cout << "-----------------------------------------------------\n";
>     std::cout << "Finished with the example." << std::endl;
>     std::cout << "=====================================================\n";
> 
>     activemq::library::ActiveMQCPP::shutdownLibrary();
> }
> 
> 
> ...what I also dont understand is why "sendReply(message);" is called AFTER
> I pressed a key ?
> ... and then I get this error on the console:
> -----------------------------------------------------
> Finished with the example.
> =====================================================
> simple_async_consumer(1639,0xa0738720) malloc: *** error for object
> 0x106a98: Non-aligned pointer being freed
> *** set a breakpoint in malloc_error_break to debug
> 
-- 
Tim Bish
http://fusesource.com
http://timbish.blogspot.com/




Mime
View raw message