activemq-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From moonbird <moonb...@ymail.com>
Subject Re: ActiveMQ c++ and Java
Date Sat, 29 Aug 2009 16:51:28 GMT

this is my java application:
...
// Create connection. Create session from connection; false means
        // session is not transacted. Create requestor and text message.
Send
        // messages, wait for answer and finally close session and
connection.
		try
		{
			queueConnection = queueConnectionFactory.createQueueConnection();
			queueSession = queueConnection.createQueueSession(false,
Session.AUTO_ACKNOWLEDGE);
			// obviously this is needed ??? why doesnt it work to lookup the activemq
queue via jndi ?
			stationInfoQueue = queueSession.createQueue("stationInfoQueue");
			textMessage = queueSession.createTextMessage(myXMLRequestAsString);
			// javax.jms.QueueRequestor creates a TemporaryQueue for the responses
and provides a request method that sends the request message 
			// and waits for its reply.This is a basic request/reply abstraction that
should be sufficient for most uses. 
			// JMS providers and clients are free to create more sophisticated
versions.
			queueRequestor = new QueueRequestor(queueSession, stationInfoQueue);
			//sends the message and waits until respond is received
			TextMessage answer = (TextMessage) queueRequestor.request(textMessage);
			System.out.println("CLIENT: Response message received: ");
			System.out.println(answer.getText());
		}
		catch (JMSException e)
		{
			System.out.println("JMSExceptionn occurred:" + e);
		}
...

and this is my c++ application (it is a modified version of the apache
example SimpleAsyncConsumer.cpp):

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#include <decaf/lang/Thread.h>
#include <decaf/lang/Runnable.h>
#include <decaf/util/concurrent/CountDownLatch.h>
#include <activemq/core/ActiveMQConnectionFactory.h>
#include <activemq/library/ActiveMQCPP.h>
#include <decaf/lang/Integer.h>
#include <activemq/util/Config.h>
#include <decaf/util/Date.h>
#include <cms/Connection.h>
#include <cms/Session.h>
#include <cms/TextMessage.h>
#include <cms/BytesMessage.h>
#include <cms/MapMessage.h>
#include <cms/ExceptionListener.h>
#include <cms/MessageListener.h>
#include <stdlib.h>
#include <iostream>

using namespace activemq;
using namespace activemq::core;
using namespace decaf::lang;
using namespace decaf::util;
using namespace decaf::util::concurrent;
using namespace cms;
using namespace std;

////////////////////////////////////////////////////////////////////////////////
class SimpleAsyncConsumer : public ExceptionListener,
                            public MessageListener {
private:

    Connection* connection;
    Session* session;
    Destination* destination;
    MessageConsumer* consumer;
    bool useTopic;
    bool clientAck;
    std::string brokerURI;
    std::string destURI;

public:

    SimpleAsyncConsumer( const std::string& brokerURI,
                         const std::string& destURI,
                         bool useTopic = false,
                         bool clientAck = false ) {
        connection = NULL;
        session = NULL;
        destination = NULL;
        consumer = NULL;
        this->useTopic = useTopic;
        this->brokerURI = brokerURI;
        this->destURI = destURI;
        this->clientAck = clientAck;
    }

    virtual ~SimpleAsyncConsumer(){
        cleanup();
    }

    void runConsumer() {

        try {

            // Create a ConnectionFactory
            ActiveMQConnectionFactory* connectionFactory =
                new ActiveMQConnectionFactory( brokerURI );

            // Create a Connection
            connection = connectionFactory->createConnection();
            delete connectionFactory;
            connection->start();

            connection->setExceptionListener(this);

            // Create a Session
            if( clientAck ) {
                session = connection->createSession(
Session::CLIENT_ACKNOWLEDGE );
            } else {
                session = connection->createSession(
Session::AUTO_ACKNOWLEDGE );
            }

            // Create the destination (Topic or Queue)
            if( useTopic ) {
                destination = session->createTopic( destURI );
            } else {
                destination = session->createQueue( destURI );
            }

            // Create a MessageConsumer from the Session to the Topic or
Queue
            consumer = session->createConsumer( destination );
            consumer->setMessageListener( this );

        } catch (CMSException& e) {
            e.printStackTrace();
        }
    }

    // Called from the consumer since this class is a registered
MessageListener.
    virtual void onMessage( const Message* message ){

        static int count = 0;

        try
        {
            count++;
            const TextMessage* textMessage =
                dynamic_cast< const TextMessage* >( message );
            string text = "";

            if( textMessage != NULL ) 
            {
                text = textMessage->getText();
            } 
            else 
            {
                text = "NOT A TEXTMESSAGE!";
            }

            if( clientAck ) 
            {
                message->acknowledge();
            }

            printf( "Message #%d Received: %s\n", count, text.c_str() );
            
            //sendReply(textMessage);
        } 
        catch (CMSException& e) 
        {
            e.printStackTrace();
        }
        sendReply(message);
    }

    // If something bad happens you see it here as this class is also been
    // registered as an ExceptionListener with the connection.
    virtual void onException( const CMSException& ex AMQCPP_UNUSED) {
        printf("CMS Exception occurred.  Shutting down client.\n");
        exit(1);
    }

	void sendReply(const Message* msg)
	{
		printf( "started sendrReply()");
		
		try 
		{
            // Create a ConnectionFactory
            auto_ptr<ActiveMQConnectionFactory> connectionFactory(
                new ActiveMQConnectionFactory( brokerURI ) );

            try
            {
            	connection = connectionFactory->createConnection();
                    connection->start();
            }
            catch( CMSException& e ) 
            {
                    e.printStackTrace();
            }
            // Create a Session
            session = connection->createSession( Session::AUTO_ACKNOWLEDGE
);
            

            // Create the destination (Topic or Queue)
            destination = const_cast<Destination*>(msg->getCMSReplyTo());
            

            // Create a MessageProducer from the Session to the Topic or
Queue
            MessageProducer* producer = session->createProducer( destination
);
            producer->setDeliveryMode( DeliveryMode::NON_PERSISTENT );

            // Create a messages
            string text = (string)"Hello World back";
            TextMessage* message = session->createTextMessage( text );

        
            // Tell the producer to send the message
            producer->send( message );

            delete message;
        

        }
        catch ( CMSException& e ) {
            e.printStackTrace();
        }
	}
	
private:

    void cleanup(){

        //*************************************************
        // Always close destination, consumers and producers before
        // you destroy their sessions and connection.
        //*************************************************

        // Destroy resources.
        try{
            if( destination != NULL ) delete destination;
        }catch (CMSException& e) { e.printStackTrace(); }
        destination = NULL;

        try{
            if( consumer != NULL ) delete consumer;
        }catch (CMSException& e) { e.printStackTrace(); }
        consumer = NULL;

        // Close open resources.
        try{
            if( session != NULL ) session->close();
            if( connection != NULL ) connection->close();
        }catch (CMSException& e) { e.printStackTrace(); }

        // Now Destroy them
        try{
            if( session != NULL ) delete session;
        }catch (CMSException& e) { e.printStackTrace(); }
        session = NULL;

        try{
            if( connection != NULL ) delete connection;
        }catch (CMSException& e) { e.printStackTrace(); }
        connection = NULL;
    }
};

////////////////////////////////////////////////////////////////////////////////
int main(int argc AMQCPP_UNUSED, char* argv[] AMQCPP_UNUSED) {

    activemq::library::ActiveMQCPP::initializeLibrary();

    std::cout << "=====================================================\n";
    std::cout << "Starting the example:" << std::endl;
    std::cout << "-----------------------------------------------------\n";

    // Set the URI to point to the IPAddress of your broker.
    // add any optional params to the url to enable things like
    // tightMarshalling or tcp logging etc.  See the CMS web site for
    // a full list of configuration options.
    //
    //  http://activemq.apache.org/cms/
    //
    // Wire Format Options:
    // =====================
    // Use either stomp or openwire, the default ports are different for
each
    //
    // Examples:
    //    tcp://127.0.0.1:61616                      default to openwire
    //    tcp://127.0.0.1:61616?wireFormat=openwire  same as above
    //    tcp://127.0.0.1:61613?wireFormat=stomp     use stomp instead
    //
    std::string brokerURI =
        "failover:(tcp://127.0.0.1:61616"
//        "?wireFormat=openwire"
//        "&connection.useAsyncSend=true"
//        "&transport.commandTracingEnabled=true"
//        "&transport.tcpTracingEnabled=true"
//        "&wireFormat.tightEncodingEnabled=true"
        ")";

    //============================================================
    // This is the Destination Name and URI options.  Use this to
    // customize where the consumer listens, to have the consumer
    // use a topic or queue set the 'useTopics' flag.
    //============================================================
    //std::string destURI = "TEST.FOO"; //?consumer.prefetchSize=1";
    std::string destURI = "stationInfoQueue"; //?consumer.prefetchSize=1";


    //============================================================
    // set to true to use topics instead of queues
    // Note in the code above that this causes createTopic or
    // createQueue to be used in the consumer.
    //============================================================
    bool useTopics = false;

    //============================================================
    // set to true if you want the consumer to use client ack mode
    // instead of the default auto ack mode.
    //============================================================
    bool clientAck = false;

    // Create the consumer
    SimpleAsyncConsumer consumer( brokerURI, destURI, useTopics, clientAck
);

    // Start it up and it will listen forever.
    consumer.runConsumer();

    // Wait to exit.
    std::cout << "Press 'q' to quit" << std::endl;
    while( std::cin.get() != 'q') {}

    std::cout << "-----------------------------------------------------\n";
    std::cout << "Finished with the example." << std::endl;
    std::cout << "=====================================================\n";

    activemq::library::ActiveMQCPP::shutdownLibrary();
}


...what I also dont understand is why "sendReply(message);" is called AFTER
I pressed a key ?
... and then I get this error on the console:
-----------------------------------------------------
Finished with the example.
=====================================================
simple_async_consumer(1639,0xa0738720) malloc: *** error for object
0x106a98: Non-aligned pointer being freed
*** set a breakpoint in malloc_error_break to debug

-- 
View this message in context: http://www.nabble.com/ActiveMQ-c%2B%2B-and-Java-tp25190274p25204452.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.


Mime
View raw message