activemq-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Timothy Bish <tabish...@gmail.com>
Subject Re: The Decaf Threading API is in a Shutdown State. Exception
Date Wed, 16 Nov 2011 23:10:27 GMT
On Wed, 2011-11-16 at 12:53 -0800, mrdiesel wrote:
> I took exactly the same code from examples, but instead of asynchronous, i
> want synchronous client. So all i changed was remove tge listeners and
> return the message to runConsumer method.
> 
> I keep getting segmentation fault. 
> 
> Can you advise?
> 

You can also simplify your cleanup method as follows:

    void cleanup() {
        if (connection != NULL) {
            connection->close();
        }
        delete destination;
        destination = NULL;
        delete consumer;
        consumer = NULL;
        delete session;
        session = NULL;
        delete connection;
        connection = NULL;
    }

Regards
Tim.

> Below is the code I m running:
> #include <decaf/lang/Thread.h>
> #include <decaf/lang/Runnable.h>
> #include <decaf/util/concurrent/CountDownLatch.h>
> #include <activemq/core/ActiveMQConnectionFactory.h>
> #include <activemq/core/ActiveMQConnection.h>
> #include <activemq/transport/DefaultTransportListener.h>
> #include <activemq/library/ActiveMQCPP.h>
> #include <decaf/lang/Integer.h>
> #include <activemq/util/Config.h>
> #include <decaf/util/Date.h>
> #include <cms/Connection.h>
> #include <cms/Session.h>
> #include <cms/TextMessage.h>
> #include <cms/BytesMessage.h>
> #include <cms/MapMessage.h>
> #include <cms/ExceptionListener.h>
> #include <cms/MessageListener.h>
> #include <stdlib.h>
> #include <stdio.h>
> #include <iostream>
> 
> using namespace activemq;
> using namespace activemq::core;
> using namespace activemq::transport;
> using namespace decaf::lang;
> using namespace decaf::util;
> using namespace decaf::util::concurrent;
> using namespace cms;
> using namespace std;
> 
> ////////////////////////////////////////////////////////////////////////////////
> class YConsumer{
> private:
> 
>     Connection* connection;
>     Session* session;
>     Destination* destination;
>     MessageConsumer* consumer;
>     bool useTopic;
>     std::string brokerURI;
>     std::string destURI;
>     bool clientAck;
> 
> public:
> 
>      YConsumer( const std::string& brokerURI,
>                          const std::string& destURI,
>                          bool useTopic = false,
>                          bool clientAck = false ) :
>         connection(NULL),
>         session(NULL),
>         destination(NULL),
>         consumer(NULL),
>         useTopic(useTopic),
>         brokerURI(brokerURI),
>         destURI(destURI),
>         clientAck(clientAck) {
>     }
> 
> 
>     virtual ~YConsumer() throw() {
>         this->cleanup();
>     }
> 
>     void close() {
>         this->cleanup();
>     }
> 
>     string runConsumer() {
> 
>         try {
>          
>             auto_ptr<ConnectionFactory>
> connectionFactory(ConnectionFactory::createCMSConnectionFactory( brokerURI )
> );
>  
>             connection = connectionFactory->createConnection();
>  
>             connection->start();
> 
>  
>             connection->start();
>  
> 
> 	    if( clientAck ) {
>                 session = connection->createSession(
> Session::CLIENT_ACKNOWLEDGE );
>             } else {
>                 session = connection->createSession(
> Session::AUTO_ACKNOWLEDGE );
>             }
>  
>             if( useTopic ) {
>                 destination = session->createTopic( destURI );
>             } else {
>                 destination = session->createQueue( destURI );
>             }
>  
>             consumer = session->createConsumer( destination );
>  
> 	    Message* message = consumer->receive();
>  
> 	    TextMessage* textMessage = dynamic_cast<TextMessage* >( message );
>  	
> 	    string text = "";
>             
> 
> 	    if( textMessage != NULL ) {
>                 text = textMessage->getText();
>             } else {
>                 text = "NOT A TEXTMESSAGE!";
>             }
>      
> 	    //std::cout<<text.c_str()&lt;&lt;std::endl;
>             //delete message;
> 	    //delete textMessage;
>      
> 		return text.c_str();
>         } catch (CMSException&amp; e) {
>             e.printStackTrace();
>         }
>     }
> 
> private:
> 
>     void cleanup(){
> 
>         //*************************************************
>         // Always close destination, consumers and producers before
>         // you destroy their sessions and connection.
>         //*************************************************
>  
>         // Destroy resources.
>         try{
>             if( destination != NULL ) delete destination;
>         }catch (CMSException&amp; e) {e.printStackTrace();}
>         destination = NULL;
>  
>         try{
>             if( consumer != NULL ) delete consumer;
>         }catch (CMSException&amp; e) {e.printStackTrace();}
>         consumer = NULL;
>  
>         // Close open resources.
>         try{
>   
>             if( session != NULL ) session->close();
>             if( connection != NULL ) connection->close();
>         }catch (CMSException& e) {e.printStackTrace();}
>  
>         // Now Destroy them
>         try{
>             if( session != NULL ) delete session;
>         }catch (CMSException& e) {e.printStackTrace();}
>         session = NULL;
>  
>         try{
>             if( connection != NULL ) delete connection;
>         }catch (CMSException& e) {e.printStackTrace();}
>         connection = NULL;
>  
>     }
> };
> 
> ////////////////////////////////////////////////////////////////////////////////
> int main(int argc AMQCPP_UNUSED, char* argv[] AMQCPP_UNUSED) {
>  for(int i=0; i< 100;i++){
>     activemq::library::ActiveMQCPP::initializeLibrary();
>  
>     std::string brokerURI =
> "failover:(tcp://127.0.0.1:61613?wireFormat=stomp)";
>         
>     std::string destURI = "TEST.Prototype";  
>  
>     bool useTopics = false;
>  
>     bool clientAck = false;
>  
>     YConsumer consumer( brokerURI, destURI, useTopics, clientAck );
>   
>     string response = consumer.runConsumer();
>    
>     std::cout<< response<<std::endl;
>   
>     consumer.close();
>  
>     //activemq::library::ActiveMQCPP::shutdownLibrary();
> } 
> }
> 
> 
> 
> --
> View this message in context: http://activemq.2283324.n4.nabble.com/The-Decaf-Threading-API-is-in-a-Shutdown-State-Exception-tp4077689p4077852.html
> Sent from the ActiveMQ - User mailing list archive at Nabble.com.

-- 
Tim Bish
------------
FuseSource
Email: tim.bish@fusesource.com
Web: http://fusesource.com
Twitter: tabish121
Blog: http://timbish.blogspot.com/




Mime
View raw message