activemq-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Timothy Bish <tabish...@gmail.com>
Subject Re: how to implement multiple message producers in activemq cpp
Date Tue, 17 Mar 2009 22:15:13 GMT
Here is my version of the code which runs to completion without
segfaults.



/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version
2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#include <decaf/lang/Thread.h>
#include <decaf/lang/Runnable.h>
#include <decaf/util/concurrent/CountDownLatch.h>
#include <decaf/lang/Integer.h>
#include <decaf/util/Date.h>
#include <activemq/core/ActiveMQConnectionFactory.h>
#include <activemq/util/Config.h>
#include <cms/Connection.h>
#include <cms/Session.h>
#include <cms/TextMessage.h>
#include <cms/BytesMessage.h>
#include <cms/MapMessage.h>
#include <cms/ExceptionListener.h>
#include <cms/MessageListener.h>
#include <stdlib.h>
#include <iostream>
#include <memory>

using namespace activemq;
using namespace activemq::core;
using namespace decaf;
using namespace decaf::lang;
using namespace decaf::util;
using namespace decaf::util::concurrent;
using namespace cms;
using namespace std;

////////////////////////////////////////////////////////////////////////////////
class SimpleProducer : public Runnable {
private:

    Connection* connection;
    Session* session;
    Destination* destination1;
    Destination* destination2;
    MessageProducer* producer1;
    MessageProducer* producer2;
    bool useTopic;
    bool clientAck;
    unsigned int numMessages;
    std::string brokerURI;
    std::string destURI1;
    std::string destURI2;
    unsigned int connectRetries;

public:

    SimpleProducer( const std::string& brokerURI,
                    unsigned int numMessages,
                    const std::string& destURI1,
                    const std::string& destURI2,
                    bool useTopic = false,
                    bool clientAck = false,
                    unsigned int connectRetries = 0 ){
        connection = NULL;
        session = NULL;
        destination1 = NULL;
        destination2 = NULL;
        producer1 = NULL;
        producer2 = NULL;
        this->numMessages = numMessages;
        this->useTopic = useTopic;
        this->brokerURI = brokerURI;
        this->destURI1 = destURI1;
        this->destURI2 = destURI2;
        this->clientAck = clientAck;
        this->connectRetries = 0;
    }

    virtual ~SimpleProducer(){
        cleanup();
    }

    void setConnectRetries( unsigned int retries ) {
        this->connectRetries = retries;
    }

    unsigned int getConnectRetries() const {
        return this->connectRetries;
    }

    virtual void run() {
        try {
            // Create a ConnectionFactory
            auto_ptr<ActiveMQConnectionFactory> connectionFactory(
                new ActiveMQConnectionFactory( brokerURI ) );

            unsigned int retries = this->connectRetries;
            do{
                // Create a Connection
                try{
                    connection = connectionFactory->createConnection();
                    connection->start();
                } catch( CMSException& e ) {
                    e.printStackTrace();

                    if( retries == 0 ) {
                        return;
                    }
                }
            } while( retries-- != 0 );

            // Create a Session
            if( clientAck ) {
                session =
connection->createSession( Session::CLIENT_ACKNOWLEDGE );
            } else {
                session =
connection->createSession( Session::AUTO_ACKNOWLEDGE );
            }

            // Create the destination (Topic or Queue)
            if( useTopic ) {
                destination1 = session->createTopic( destURI1 );
                destination2 = session->createTopic( destURI2 );
            } else {
                destination1 = session->createQueue( destURI1 );
                destination2 = session->createQueue( destURI2 );
            }

            // Create a MessageProducer from the Session to the Topic or
Queue
            producer1 = session->createProducer( destination1 );
            producer2 = session->createProducer( destination2 );
            producer1->setDeliveryMode( DeliveryMode::NON_PERSISTENT );
            producer2->setDeliveryMode( DeliveryMode::NON_PERSISTENT );

            // Create the Thread Id String
            string threadIdStr = Integer::toString( Thread::getId() );

            // Create a messages
            string text = (string)"Hello world! from thread " +
threadIdStr;

            for( unsigned int ix=0; ix<numMessages; ++ix ){
                TextMessage* message =
session->createTextMessage( text );

                message->setIntProperty( "Integer", ix );

                // Tell the producer to send the message
                printf( "Sent message #%d from thread %s\n", ix+1,
threadIdStr.c_str() );
                producer1->send( message );
                producer2->send( message );

                delete message;
            }

        }catch ( CMSException& e ) {
            e.printStackTrace();
        }
    }

private:

    void cleanup(){

        // Destroy resources.
        try{
            if( destination1 != NULL ) delete destination1;
            if( destination2 != NULL ) delete destination2;
        }catch ( CMSException& e ) { e.printStackTrace(); }
        destination1 = NULL;
        destination2 = NULL;

        try{
            if( producer1 != NULL ) delete producer1;
            if( producer2 != NULL ) delete producer2;
        }catch ( CMSException& e ) { e.printStackTrace(); }
        producer1 = NULL;
        producer2 = NULL;

        // Close open resources.
        try{
            if( session != NULL ) session->close();
            if( connection != NULL ) connection->close();
        }catch ( CMSException& e ) { e.printStackTrace(); }

        try{
            if( session != NULL ) delete session;
        }catch ( CMSException& e ) { e.printStackTrace(); }
        session = NULL;

        try{
            if( connection != NULL ) delete connection;
        }catch ( CMSException& e ) { e.printStackTrace(); }
        connection = NULL;
    }
};

////////////////////////////////////////////////////////////////////////////////
int main(int argc AMQCPP_UNUSED, char* argv[] AMQCPP_UNUSED) {

    std::cout << "=====================================================
\n";
    std::cout << "Starting the example:" << std::endl;
    std::cout <<
"-----------------------------------------------------\n";

    // Set the URI to point to the IPAddress of your broker.
    // add any optional params to the url to enable things like
    // tightMarshalling or tcp logging etc.  See the CMS web site for
    // a full list of configuration options.
    //
    //  http://activemq.apache.org/cms/
    //
    // Wire Format Options:
    // =====================
    // Use either stomp or openwire, the default ports are different for
each
    //
    // Examples:
    //    tcp://127.0.0.1:61616                      default to openwire
    //    tcp://127.0.0.1:61616?wireFormat=openwire  same as above
    //    tcp://127.0.0.1:61613?wireFormat=stomp     use stomp instead
    //
    std::string brokerURI =
        "tcp://127.0.0.1:61616"
        "?wireFormat=openwire"
//        "&connection.useAsyncSend=true"
//        "&transport.commandTracingEnabled=true"
//        "&transport.tcpTracingEnabled=true"
//        "&wireFormat.tightEncodingEnabled=true"
        ;

    //============================================================
    // Total number of messages for this producer to send.
    //============================================================
    unsigned int numMessages = 2000;

    //============================================================
    // This is the Destination Name and URI options.  Use this to
    // customize where the Producer produces, to have the producer
    // use a topic or queue set the 'useTopics' flag.
    //============================================================
    std::string destURI1 = "TEST.FOO";
    std::string destURI2 = "TEST.BAR";

    //============================================================
    // set to true to use topics instead of queues
    // Note in the code above that this causes createTopic or
    // createQueue to be used in the producer.
    //============================================================
    bool useTopics = false;

    // Pass an integer value to the producer for retry
    unsigned int connectRetries = 0;

    if( argc > 1 ) {
        try {
            connectRetries = decaf::lang::Integer::parseInt( argv[1] );
        } catch( decaf::lang::exceptions::NumberFormatException& ex ) {
            connectRetries = 0;
        }
    }

    // Create the producer and run it.
    SimpleProducer producer( brokerURI, numMessages, destURI1, destURI2,
useTopics );
    producer.setConnectRetries( connectRetries );
    producer.run();

    std::cout <<
"-----------------------------------------------------\n";
    std::cout << "Finished with the example." << std::endl;
    std::cout << "=====================================================
\n";
}


On Tue, 2009-03-17 at 08:24 -0700, manua wrote:
> Hi Tim,
> 
> I have pasted the main content of my file here.
> 
> Actually, I wanted to have a single session and multiple message producers
> in that session.Each producer will be sending a different message o its
> respective topic/queue. 
> 
> In the single program, if I create different connection and different
> sessions, then the program is working fine, but giving issues with multiple
> message producers in the single session.
> 
> 
> class SimpleProducer : public Runnable {
> private:
> 
>     Connection* connection;
>     Session* session;
>     Destination* destination;
>     Destination* destination2;
>     MessageProducer* producer;
>     MessageProducer* producer2;
>     bool useTopic;
>     bool clientAck;
>     unsigned int numMessages;
>     std::string brokerURI;
>     std::string destURI;
>     std::string destURI2;
>     unsigned int connectRetries;
> 
> public:
> 
>     SimpleProducer( const std::string& brokerURI,
>                     unsigned int numMessages,
>                     const std::string& destURI,
>                     bool useTopic = false,
>                     bool clientAck = false,
>                     unsigned int connectRetries = 0 ){
>         connection = NULL;
>         session = NULL;
>         destination = NULL;
>         producer = NULL;
>         this->numMessages = numMessages;
>         this->useTopic = useTopic;
>         this->brokerURI = brokerURI;
>         this->destURI = destURI;
>         this->clientAck = clientAck;
>         this->connectRetries = 0;
>     }
> 
>     virtual ~SimpleProducer(){
>         cleanup();
>     }
> 
>     void setConnectRetries( unsigned int retries ) {
>         this->connectRetries = retries;
>     }
>     unsigned int getConnectRetries() const {
>         return this->connectRetries;
>     }
> 
>     virtual void run() {
>         try {
>             // Create a ConnectionFactory
>             auto_ptr<ActiveMQConnectionFactory> connectionFactory(
>                 new ActiveMQConnectionFactory( brokerURI ) );
> 
>             unsigned int retries = this->connectRetries;
>             do{
>                 // Create a Connection
>                 try{
>                     connection = connectionFactory->createConnection();
>                     connection->start();
>                 } catch( CMSException& e ) {
>                     e.printStackTrace();
> 
>                     if( retries == 0 ) {
>                         return;
>                     }
>                 }
>             } while( retries-- != 0 );
> 
>             // Create a Session
>             if( clientAck ) {
>                 session = connection->createSession(
> Session::CLIENT_ACKNOWLEDGE );
>             } else {
>                 session = connection->createSession(
> Session::AUTO_ACKNOWLEDGE );
>             }
> 
>             // Create the destination (Topic or Queue)
>             if( useTopic ) {
>                 destination = session->createTopic( destURI );
>                 destination2 = session->createTopic( destURI2 ); 
>             } else {
>                 destination = session->createQueue( destURI );
>                 destination2 = session->createQueue( destURI2 );
>             }
> 
>             // Create a MessageProducer from the Session to the Topic or
> Queue
>             producer = session->createProducer( destination );
>             producer2 = session->createProducer( destination2 );
> 
>             producer->setDeliveryMode( DeliveryMode::NON_PERSISTENT );
> 
>             producer2->setDeliveryMode( DeliveryMode::NON_PERSISTENT );
> 
>             // Create the Thread Id String
>             string threadIdStr = Integer::toString( Thread::getId() );
> 
>             // Create a messages
>             string text = (string)"Hello world! from thread " + threadIdStr;
> 
>                 string text2 = (string)"Message for second producer ";
> 
>             for( unsigned int ix=0; ix<numMessages; ++ix ){
>                 TextMessage* message = session->createTextMessage( text );
> 
>                 TextMessage* message2 = session->createTextMessage( text2 );
> 
>                 message->setIntProperty( "Integer", ix );
> 
>                 // Tell the producer to send the message
>                 printf( "Sent message #%d from thread %s\n", ix+1,
> threadIdStr.c_str() );
>                 producer->send( message );
>                 producer2->send( message2 );
> 
>                 delete message;
>                 delete message2;
>             }
> 
>         }catch ( CMSException& e ) {
>             e.printStackTrace();
>         }
>     }
> 
> private:
> 
>     void cleanup(){
> 
>         // Destroy resources.
>         try{
>             if( destination != NULL ) delete destination;
>                                         delete destination2;
>         }catch ( CMSException& e ) { e.printStackTrace(); }
>         destination = NULL;
>         destination2 = NULL;
> 
>         try{
>             if( producer != NULL ) delete producer;
>                                         delete producer2;
>         }catch ( CMSException& e ) { e.printStackTrace(); }
>         producer = NULL; producer2 = NULL;
> 
>         // Close open resources.
>         try{
>             if( session != NULL ) session->close();
>             if( connection != NULL ) connection->close();
>         }catch ( CMSException& e ) { e.printStackTrace(); }
> 
>         try{
>             if( session != NULL ) delete session;
>         }catch ( CMSException& e ) { e.printStackTrace(); }
>         session = NULL;
> 
>         try{
>             if( connection != NULL ) delete connection;
>         }catch ( CMSException& e ) { e.printStackTrace(); }
>         connection = NULL;
>     }
> };
> 
> ////////////////////////////////////////////////////////////////////////////////
> int main(int argc AMQCPP_UNUSED, char* argv[] AMQCPP_UNUSED) {
> 
>     std::cout << "=====================================================\n";
>     std::cout << "Starting the example:" << std::endl;
>     std::cout << "-----------------------------------------------------\n";
> 
>     // Set the URI to point to the IPAddress of your broker.
>     // add any optional params to the url to enable things like
>     // tightMarshalling or tcp logging etc.  See the CMS web site for
>     // a full list of configuration options.
>     //
>     //  http://activemq.apache.org/cms/
>     //
>     // Wire Format Options:
>     // =====================
>     // Use either stomp or openwire, the default ports are different for
> each
>     //
>     // Examples:
>     //    tcp://127.0.0.1:61616                      default to openwire
>     //    tcp://127.0.0.1:61616?wireFormat=openwire  same as above
>     //    tcp://127.0.0.1:61613?wireFormat=stomp     use stomp instead
>     //
>     std::string brokerURI =
>         "tcp://localhost:61616"
>         "?wireFormat=openwire"
> //        "&connection.useAsyncSend=true"
> //        "&transport.commandTracingEnabled=true"
> //        "&transport.tcpTracingEnabled=true"
> //        "&wireFormat.tightEncodingEnabled=true"
>         ;
> 
>     //============================================================
>     // Total number of messages for this producer to send.
>     //============================================================
>     unsigned int numMessages = 20;
> 
>     //============================================================
>     // This is the Destination Name and URI options.  Use this to
>     // customize where the Producer produces, to have the producer
>     // use a topic or queue set the 'useTopics' flag.
>     //============================================================
>     std::string destURI = "TEST.FOO";
> 
>         std::string destURI2 = "TEST1.FOO";
>     //============================================================
>     // set to true to use topics instead of queues
>     // Note in the code above that this causes createTopic or
>     // createQueue to be used in the producer.
>     //============================================================
>     bool useTopics = true;
> 
>     // Pass an integer value to the producer for retry
>     unsigned int connectRetries = 0;
> 
>     if( argc > 1 ) {
>         try {
>             connectRetries = decaf::lang::Integer::parseInt( argv[1] );
>         } catch( decaf::lang::exceptions::NumberFormatException& ex ) {
>             connectRetries = 0;
>         }
>     }
> 
>     // Create the producer and run it.
>     SimpleProducer producer( brokerURI, numMessages, destURI, useTopics );
>     producer.setConnectRetries( connectRetries );
>     producer.run();
> 
> 
>         SimpleProducer producer2( brokerURI, numMessages, destURI2,
> useTopics );        producer2.setConnectRetries( connectRetries );
>         producer2.run();
> 
>     std::cout << "-----------------------------------------------------\n";
>     std::cout << "Finished with the example." << std::endl;
>     std::cout << "=====================================================\n";
> }
> 

-- 
Tim Bish
http://fusesource.com
http://timbish.blogspot.com/




Mime
View raw message