activemq-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From manua <agarwal.m...@gmail.com>
Subject Re: how to implement multiple message producers in activemq cpp
Date Tue, 17 Mar 2009 15:24:03 GMT

Hi Tim,

I have pasted the main content of my file here.

Actually, I wanted to have a single session and multiple message producers
in that session.Each producer will be sending a different message o its
respective topic/queue. 

In the single program, if I create different connection and different
sessions, then the program is working fine, but giving issues with multiple
message producers in the single session.


class SimpleProducer : public Runnable {
private:

    Connection* connection;
    Session* session;
    Destination* destination;
    Destination* destination2;
    MessageProducer* producer;
    MessageProducer* producer2;
    bool useTopic;
    bool clientAck;
    unsigned int numMessages;
    std::string brokerURI;
    std::string destURI;
    std::string destURI2;
    unsigned int connectRetries;

public:

    SimpleProducer( const std::string& brokerURI,
                    unsigned int numMessages,
                    const std::string& destURI,
                    bool useTopic = false,
                    bool clientAck = false,
                    unsigned int connectRetries = 0 ){
        connection = NULL;
        session = NULL;
        destination = NULL;
        producer = NULL;
        this->numMessages = numMessages;
        this->useTopic = useTopic;
        this->brokerURI = brokerURI;
        this->destURI = destURI;
        this->clientAck = clientAck;
        this->connectRetries = 0;
    }

    virtual ~SimpleProducer(){
        cleanup();
    }

    void setConnectRetries( unsigned int retries ) {
        this->connectRetries = retries;
    }
    unsigned int getConnectRetries() const {
        return this->connectRetries;
    }

    virtual void run() {
        try {
            // Create a ConnectionFactory
            auto_ptr<ActiveMQConnectionFactory> connectionFactory(
                new ActiveMQConnectionFactory( brokerURI ) );

            unsigned int retries = this->connectRetries;
            do{
                // Create a Connection
                try{
                    connection = connectionFactory->createConnection();
                    connection->start();
                } catch( CMSException& e ) {
                    e.printStackTrace();

                    if( retries == 0 ) {
                        return;
                    }
                }
            } while( retries-- != 0 );

            // Create a Session
            if( clientAck ) {
                session = connection->createSession(
Session::CLIENT_ACKNOWLEDGE );
            } else {
                session = connection->createSession(
Session::AUTO_ACKNOWLEDGE );
            }

            // Create the destination (Topic or Queue)
            if( useTopic ) {
                destination = session->createTopic( destURI );
                destination2 = session->createTopic( destURI2 ); 
            } else {
                destination = session->createQueue( destURI );
                destination2 = session->createQueue( destURI2 );
            }

            // Create a MessageProducer from the Session to the Topic or
Queue
            producer = session->createProducer( destination );
            producer2 = session->createProducer( destination2 );

            producer->setDeliveryMode( DeliveryMode::NON_PERSISTENT );

            producer2->setDeliveryMode( DeliveryMode::NON_PERSISTENT );

            // Create the Thread Id String
            string threadIdStr = Integer::toString( Thread::getId() );

            // Create a messages
            string text = (string)"Hello world! from thread " + threadIdStr;

                string text2 = (string)"Message for second producer ";

            for( unsigned int ix=0; ix<numMessages; ++ix ){
                TextMessage* message = session->createTextMessage( text );

                TextMessage* message2 = session->createTextMessage( text2 );

                message->setIntProperty( "Integer", ix );

                // Tell the producer to send the message
                printf( "Sent message #%d from thread %s\n", ix+1,
threadIdStr.c_str() );
                producer->send( message );
                producer2->send( message2 );

                delete message;
                delete message2;
            }

        }catch ( CMSException& e ) {
            e.printStackTrace();
        }
    }

private:

    void cleanup(){

        // Destroy resources.
        try{
            if( destination != NULL ) delete destination;
                                        delete destination2;
        }catch ( CMSException& e ) { e.printStackTrace(); }
        destination = NULL;
        destination2 = NULL;

        try{
            if( producer != NULL ) delete producer;
                                        delete producer2;
        }catch ( CMSException& e ) { e.printStackTrace(); }
        producer = NULL; producer2 = NULL;

        // Close open resources.
        try{
            if( session != NULL ) session->close();
            if( connection != NULL ) connection->close();
        }catch ( CMSException& e ) { e.printStackTrace(); }

        try{
            if( session != NULL ) delete session;
        }catch ( CMSException& e ) { e.printStackTrace(); }
        session = NULL;

        try{
            if( connection != NULL ) delete connection;
        }catch ( CMSException& e ) { e.printStackTrace(); }
        connection = NULL;
    }
};

////////////////////////////////////////////////////////////////////////////////
int main(int argc AMQCPP_UNUSED, char* argv[] AMQCPP_UNUSED) {

    std::cout << "=====================================================\n";
    std::cout << "Starting the example:" << std::endl;
    std::cout << "-----------------------------------------------------\n";

    // Set the URI to point to the IPAddress of your broker.
    // add any optional params to the url to enable things like
    // tightMarshalling or tcp logging etc.  See the CMS web site for
    // a full list of configuration options.
    //
    //  http://activemq.apache.org/cms/
    //
    // Wire Format Options:
    // =====================
    // Use either stomp or openwire, the default ports are different for
each
    //
    // Examples:
    //    tcp://127.0.0.1:61616                      default to openwire
    //    tcp://127.0.0.1:61616?wireFormat=openwire  same as above
    //    tcp://127.0.0.1:61613?wireFormat=stomp     use stomp instead
    //
    std::string brokerURI =
        "tcp://localhost:61616"
        "?wireFormat=openwire"
//        "&connection.useAsyncSend=true"
//        "&transport.commandTracingEnabled=true"
//        "&transport.tcpTracingEnabled=true"
//        "&wireFormat.tightEncodingEnabled=true"
        ;

    //============================================================
    // Total number of messages for this producer to send.
    //============================================================
    unsigned int numMessages = 20;

    //============================================================
    // This is the Destination Name and URI options.  Use this to
    // customize where the Producer produces, to have the producer
    // use a topic or queue set the 'useTopics' flag.
    //============================================================
    std::string destURI = "TEST.FOO";

        std::string destURI2 = "TEST1.FOO";
    //============================================================
    // set to true to use topics instead of queues
    // Note in the code above that this causes createTopic or
    // createQueue to be used in the producer.
    //============================================================
    bool useTopics = true;

    // Pass an integer value to the producer for retry
    unsigned int connectRetries = 0;

    if( argc > 1 ) {
        try {
            connectRetries = decaf::lang::Integer::parseInt( argv[1] );
        } catch( decaf::lang::exceptions::NumberFormatException& ex ) {
            connectRetries = 0;
        }
    }

    // Create the producer and run it.
    SimpleProducer producer( brokerURI, numMessages, destURI, useTopics );
    producer.setConnectRetries( connectRetries );
    producer.run();


        SimpleProducer producer2( brokerURI, numMessages, destURI2,
useTopics );        producer2.setConnectRetries( connectRetries );
        producer2.run();

    std::cout << "-----------------------------------------------------\n";
    std::cout << "Finished with the example." << std::endl;
    std::cout << "=====================================================\n";
}






Timothy Bish wrote:
> 
> The file you posted appears to be corrupted, you may want to try that
> again or you can open a Jira issue and attach it there.
> 
> Regards
> Tim.
> 
> On Mon, 2009-03-16 at 21:07 -0700, manua wrote:
>> I tried to copy paste the same code, but when I tried to run it, i get
>> the
>> following error,
>> 
>> [root@domU-12-31-39-03-00-D3 examples]# ./simple_producer
>> =====================================================
>> Starting the example:
>> -----------------------------------------------------
>> No valid response received for command: Begin Class = ProducerInfo
>>  Value of ProducerInfo::ID_PRODUCERINFO = 6
>>  Value of ProducerId is Below:
>> Begin Class = ProducerId
>>  Value of ProducerId::ID_PRODUCERID = 123
>>  Value of ConnectionId = de41a612-6662-4270-935d-f807987dab85
>>  Value of Value = 1
>>  Value of SessionId = 0
>> No Data for Class BaseDataStructure
>> End Class = ProducerId
>> 
>>  Value of Destination is Below:
>> Begin Class = ActiveMQTopic
>> Begin Class = ActiveMQDestination
>>  Value of exclusive = false
>>  Value of ordered = false
>>  Value of advisory = false
>>  Value of orderedTarget = coordinator
>>  Value of physicalName =
>>  Value of options = Begin Class activemq::util::Properties:
>> End Class activemq::util::Properties:
>> 
>> No Data for Class BaseDataStructure
>> End Class = ActiveMQDestination
>> End Class = ActiveMQTopic
>> 
>>  Value of DispatchAsync = 0
>>  Value of WindowSize = 0
>> Begin Class = BaseCommand
>> No Data for Class BaseDataStructure
>>   Response Required = 1
>>   Command Id = 4
>> End Class = BaseCommand
>> End Class = ProducerInfo
>> , check broker.
>>         FILE: activemq/transport/filters/ResponseCorrelator.cpp, LINE:
>> 112
>>         FILE: activemq/transport/filters/ResponseCorrelator.cpp, LINE:
>> 120
>>         FILE: activemq/connector/openwire/OpenWireFormatNegotiator.cpp,
>> LINE: 107
>>         FILE: activemq/connector/openwire/OpenWireConnector.cpp, LINE:
>> 1533
>>         FILE: activemq/connector/openwire/OpenWireConnector.cpp, LINE:
>> 650
>>         FILE: activemq/core/ActiveMQSession.cpp, LINE: 358
>> No valid response received for command: Begin Class = ProducerInfo
>>  Value of ProducerInfo::ID_PRODUCERINFO = 6
>>  Value of ProducerId is Below:
>> Begin Class = ProducerId
>>  Value of ProducerId::ID_PRODUCERID = 123
>>  Value of ConnectionId = de41a612-b259-5970-935d-f807987dab85
>>  Value of Value = 1
>>  Value of SessionId = 0
>> No Data for Class BaseDataStructure
>> End Class = ProducerId
>> 
>>  Value of Destination is Below:
>> Begin Class = ActiveMQTopic
>> Begin Class = ActiveMQDestination
>>  Value of exclusive = false
>>  Value of ordered = false
>>  Value of advisory = false
>>  Value of orderedTarget = coordinator
>>  Value of physicalName =
>>  Value of options = Begin Class activemq::util::Properties:
>> End Class activemq::util::Properties:
>> 
>> No Data for Class BaseDataStructure
>> End Class = ActiveMQDestination
>> End Class = ActiveMQTopic
>> 
>>  Value of DispatchAsync = 0
>>  Value of WindowSize = 0
>> Begin Class = BaseCommand
>> No Data for Class BaseDataStructure
>>   Response Required = 1
>>   Command Id = 4
>> End Class = BaseCommand
>> End Class = ProducerInfo
>> , check broker.
>>         FILE: activemq/transport/filters/ResponseCorrelator.cpp, LINE:
>> 112
>>         FILE: activemq/transport/filters/ResponseCorrelator.cpp, LINE:
>> 120
>>         FILE: activemq/connector/openwire/OpenWireFormatNegotiator.cpp,
>> LINE: 107
>>         FILE: activemq/connector/openwire/OpenWireConnector.cpp, LINE:
>> 1533
>>         FILE: activemq/connector/openwire/OpenWireConnector.cpp, LINE:
>> 650
>>         FILE: activemq/core/ActiveMQSession.cpp, LINE: 358
>> -----------------------------------------------------
>> Finished with the example.
>> =====================================================
>> [root@domU-12-31-39-03-00-D3 examples]#
>> 
>> Please find my SimpleProducer.cpp file attached with this message.
>> 
>> I modified the consumer code also in the similar fashion, but still its
>> not
>> working. 
>> Kindly look into the code and tell me your views.Thanks for your help.
>> 
>> Regards,
>> Manu 
>> 
>> 
>> 
>> Timothy Bish wrote:
>> > 
>> > On Mon, 2009-03-16 at 13:02 -0700, manua wrote:
>> >> I tried to implement multiple topics in activemq cpp. I modified the
>> >> example,
>> >> simpleProducer.cpp to have multiple producers and then created
>> multiple
>> >> destinations (multiple topics). With each producer,pointing to a
>> >> different
>> >> topic, I am trying to send a diferent message.It compiles fine, but at
>> >> run
>> >> time goves the segmentation fault and error, no response recieved for
>> >> ..Producerinfo.
>> >> 
>> >> Any help/(sample code) in this direction will be highly appreciated.
>> > 
>> > There isn't anything that should prevent you from creating multiple
>> > Producers, if should be as simple as cut and paste from the sample
>> code.
>> > You can post your modified source code and we can take a look and see
>> of
>> > there's something going on there.
>> > 
>> > Regards
>> > Tim.
>> > 
>> > 
>> > -- 
>> > Tim Bish
>> > http://fusesource.com
>> > http://timbish.blogspot.com/
>> > 
>> > 
>> > 
>> > 
>> > 
>> http://www.nabble.com/file/p22552277/SimpleProducer.cpp
>> SimpleProducer.cpp 
> -- 
> Tim Bish
> http://fusesource.com
> http://timbish.blogspot.com/
> 
> 
> 
> 
> 

-- 
View this message in context: http://www.nabble.com/how-to-implement-multiple-message-producers-in-activemq-cpp-tp22544601p22561588.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.


Mime
View raw message