activemq-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mrdiesel <mrdie...@gmail.com>
Subject Re: The Decaf Threading API is in a Shutdown State. Exception
Date Wed, 16 Nov 2011 20:53:38 GMT
I took exactly the same code from examples, but instead of asynchronous, i
want synchronous client. So all i changed was remove tge listeners and
return the message to runConsumer method.

I keep getting segmentation fault. 

Can you advise?

Below is the code I m running:
#include <decaf/lang/Thread.h>
#include <decaf/lang/Runnable.h>
#include <decaf/util/concurrent/CountDownLatch.h>
#include <activemq/core/ActiveMQConnectionFactory.h>
#include <activemq/core/ActiveMQConnection.h>
#include <activemq/transport/DefaultTransportListener.h>
#include <activemq/library/ActiveMQCPP.h>
#include <decaf/lang/Integer.h>
#include <activemq/util/Config.h>
#include <decaf/util/Date.h>
#include <cms/Connection.h>
#include <cms/Session.h>
#include <cms/TextMessage.h>
#include <cms/BytesMessage.h>
#include <cms/MapMessage.h>
#include <cms/ExceptionListener.h>
#include <cms/MessageListener.h>
#include <stdlib.h>
#include <stdio.h>
#include <iostream>

using namespace activemq;
using namespace activemq::core;
using namespace activemq::transport;
using namespace decaf::lang;
using namespace decaf::util;
using namespace decaf::util::concurrent;
using namespace cms;
using namespace std;

////////////////////////////////////////////////////////////////////////////////
class YConsumer{
private:

    Connection* connection;
    Session* session;
    Destination* destination;
    MessageConsumer* consumer;
    bool useTopic;
    std::string brokerURI;
    std::string destURI;
    bool clientAck;

public:

     YConsumer( const std::string& brokerURI,
                         const std::string& destURI,
                         bool useTopic = false,
                         bool clientAck = false ) :
        connection(NULL),
        session(NULL),
        destination(NULL),
        consumer(NULL),
        useTopic(useTopic),
        brokerURI(brokerURI),
        destURI(destURI),
        clientAck(clientAck) {
    }


    virtual ~YConsumer() throw() {
        this->cleanup();
    }

    void close() {
        this->cleanup();
    }

    string runConsumer() {

        try {
         
            auto_ptr<ConnectionFactory>
connectionFactory(ConnectionFactory::createCMSConnectionFactory( brokerURI )
);
 
            connection = connectionFactory->createConnection();
 
            connection->start();

 
            connection->start();
 
 
	    if( clientAck ) {
                session = connection->createSession(
Session::CLIENT_ACKNOWLEDGE );
            } else {
                session = connection->createSession(
Session::AUTO_ACKNOWLEDGE );
            }
 
            if( useTopic ) {
                destination = session->createTopic( destURI );
            } else {
                destination = session->createQueue( destURI );
            }
 
            consumer = session->createConsumer( destination );
 
	    Message* message = consumer->receive();
 
	    TextMessage* textMessage = dynamic_cast<TextMessage* >( message );
 	
	    string text = "";
            

	    if( textMessage != NULL ) {
                text = textMessage->getText();
            } else {
                text = "NOT A TEXTMESSAGE!";
            }
     
	    //std::cout<<text.c_str()&lt;&lt;std::endl;
            //delete message;
	    //delete textMessage;
     
		return text.c_str();
        } catch (CMSException&amp; e) {
            e.printStackTrace();
        }
    }

private:

    void cleanup(){

        //*************************************************
        // Always close destination, consumers and producers before
        // you destroy their sessions and connection.
        //*************************************************
 
        // Destroy resources.
        try{
            if( destination != NULL ) delete destination;
        }catch (CMSException&amp; e) {e.printStackTrace();}
        destination = NULL;
 
        try{
            if( consumer != NULL ) delete consumer;
        }catch (CMSException&amp; e) {e.printStackTrace();}
        consumer = NULL;
 
        // Close open resources.
        try{
  
            if( session != NULL ) session->close();
            if( connection != NULL ) connection->close();
        }catch (CMSException& e) {e.printStackTrace();}
 
        // Now Destroy them
        try{
            if( session != NULL ) delete session;
        }catch (CMSException& e) {e.printStackTrace();}
        session = NULL;
 
        try{
            if( connection != NULL ) delete connection;
        }catch (CMSException& e) {e.printStackTrace();}
        connection = NULL;
 
    }
};

////////////////////////////////////////////////////////////////////////////////
int main(int argc AMQCPP_UNUSED, char* argv[] AMQCPP_UNUSED) {
 for(int i=0; i< 100;i++){
    activemq::library::ActiveMQCPP::initializeLibrary();
 
    std::string brokerURI =
"failover:(tcp://127.0.0.1:61613?wireFormat=stomp)";
        
    std::string destURI = "TEST.Prototype";  
 
    bool useTopics = false;
 
    bool clientAck = false;
 
    YConsumer consumer( brokerURI, destURI, useTopics, clientAck );
  
    string response = consumer.runConsumer();
   
    std::cout<< response<<std::endl;
  
    consumer.close();
 
    //activemq::library::ActiveMQCPP::shutdownLibrary();
} 
}



--
View this message in context: http://activemq.2283324.n4.nabble.com/The-Decaf-Threading-API-is-in-a-Shutdown-State-Exception-tp4077689p4077852.html
Sent from the ActiveMQ - User mailing list archive at Nabble.com.

Mime
View raw message