qpid-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Xiong Zou <zouxi...@gmail.com>
Subject [Client] warning Session was not closed cleanly
Date Thu, 16 Jan 2014 02:04:16 GMT
Hi All,

I am experimenting qpid Pub-Sub pattern, referring to
examples/old_api/pub-sub sample codes. My qpid version is 0.20.

Sometimes I received warning message like below sample: when I stop my
binary:
2014-01-16 08:58:33 [Client] warning Session was not closed cleanly:
anonymous.fa805922-8f1b-4e67-9710-a3da31e9bfc8

Please also be informed that in my class destructor, I cannot delete
Listiner. If I un-comment the code //delete pListner, I will receive
following stack trace:

Program terminated with signal 6, Aborted.
#0  0x00000030fe2302c5 in raise () from /lib64/libc.so.6
(gdb) bt
#0  0x00000030fe2302c5 in raise () from /lib64/libc.so.6
#1  0x00000030fe231d70 in abort () from /lib64/libc.so.6
#2  0x00002b6987115bd1 in ~Mutex (this=0x5c4b6f0, __in_chrg=<value optimized
out>) at ../include/qpid/sys/posix/Mutex.h:112
#3  qpid::client::Dispatcher::~Dispatcher (this=0x5c4b6f0, __in_chrg=<value
optimized out>) at ./qpid/client/Dispatcher.h:66
#4  0x00002b69871340f2 in
qpid::client::SubscriptionManagerImpl::~SubscriptionManagerImpl
(this=0x5c4b6b0,
    __in_chrg=<value optimized out>) at
qpid/client/SubscriptionManagerImpl.cpp:51
#5  0x00002b6987131948 in dtor (this=0x5c4ac08, __in_chrg=<value optimized
out>) at ./qpid/RefCounted.h:42
#6  qpid::client::SubscriptionManager::~SubscriptionManager (this=0x5c4ac08,
__in_chrg=<value optimized out>)
    at qpid/client/SubscriptionManager.cpp:35
#7  0x0000000000806895 in IPC::CListener::~CListener (this=0x5c4abf0,
__in_chrg=<value optimized out>)
    at CQPIDPubSubAdaptor.cpp:78

I post my code below, please help to check where I have done something
wrong?

namespace IPC
{
        class CListener : public qpid::client::MessageListener
        {
                private:
                        CMsgQueue&                           m_helpQueue;
                        qpid::client::Session&                  m_session;
                        qpid::client::SubscriptionManager      
m_subscriptions;
                        std::vector< std::string >             
m_vecQueueUIDs;
                public:
                        CListener(qpid::client::Session& session, CMsgQueue&
queue) :
                                        m_helpQueue(queue),
                                        m_session(session),
                                        m_subscriptions(session)
                        {
                        };
                        void prepareQueue(std::string queue, std::string
exchange, std::string routing_key)
                        {
                                /* Create a unique queue name for this
consumer by concatenating
                                 * the queue name parameter with the Session
ID.
                                */

                                queue += m_session.getId().getName();
                                DEBUG << "Declaring queue: " << queue <<

ENDL;

                                /* Declare an exclusive queue on the broker
                                 */

                               
m_session.queueDeclare(qpid::client::arg::queue=queue,
qpid::client::arg::exclusive=true, qpid::client::arg::autoDelete=true);

                               
m_session.exchangeBind(qpid::client::arg::exchange=exchange,
qpid::client::arg::queue=queue, qpid::client::arg::bindingKey=routing_key);

                                /*
                                 * subscribe to the queue using the
subscription manager.
                                 */

                                DEBUG << "Subscribing to queue " << queue <<
ENDL;
                                m_subscriptions.subscribe(*this, queue);

                                m_vecQueueUIDs.push_back(queue);
                        }
                        void received(qpid::client::Message& message)
                        {
                                DEBUG << "Message: " << message.getData() <<
" from " << message.getDestination() << ENDL;
                                char* pData = NULL;
                                unsigned int iLen =
message.getData().length();
                                if(iLen > 0)
                                {
                                        pData         = new char[iLen+1];
                                        strncpy(pData,
message.getData().c_str(), iLen);
                                        pData[iLen] = 0;
                                        m_helpQueue.enqueue(pData, iLen);
                                }
                        }
                        void start(const CIPCConfig& config);
                        void stop();
                        ~CListener() { };
        };
        void CListener::start(const CIPCConfig& config)
        {

                for(unsigned int i = 0; i < config.getNumAddressRcvrs();
i++)
                {
                        DEBUG << "create qpid session susbcribe routing key
" << config.getAddressRcvr(i) << ENDL;
                        prepareQueue(config.getAddressRcvr(i),
IPC::cs_strQPIDExchgAMQTopic, config.getAddressRcvr(i));
                }
                //always listen to Commands for control messages.
                prepareQueue(IPC::cs_strQPIDKeyRouteAdminCmd,
IPC::cs_strQPIDExchgAMQTopic, IPC::cs_strQPIDKeyRouteAdminCmd);

                INFO << "run Listner Thread" << ENDL;
                m_subscriptions.start();
        }
        void CListener::stop()
        {
                //No need to cancel subscriptions with
m_subscriptions.stop().
                //for(unsigned int i; i < m_vecQueueUIDs.size(); i++)
                //{
                //      if(!m_vecQueueUIDs[i].empty())
                //              m_subscriptions.cancel(m_vecQueueUIDs[i]);
                //}
                // stop the spawned thread
                m_subscriptions.stop();
        }

};
IPC::CQPIDPubSubAdaptor::CQPIDPubSubAdaptor()
                :   m_pHelpQueue(NULL),
                    m_pSession(NULL),
                    m_pConListner(NULL),
                    m_bIsValid(false),
                        m_iWaitTime(-1)
{
}

IPC::CQPIDPubSubAdaptor::~CQPIDPubSubAdaptor()
{
        INFO << "start to clear up QPIDPubSubAdaptor" << ENDL;
        if(m_pConListner)
        {
                CListener* pListner = (CListener*)m_pConListner;
                pListner->stop();
                //delete pListner;     //cannot delete listner, will cause
~Mutex() to signal
                m_pConListner = NULL;
        }

        m_connection.close();

        if(m_pSession)
        {
                delete m_pSession;
                m_pSession = NULL;
        }

        if(m_pHelpQueue)
        {
                delete m_pHelpQueue;
                m_pHelpQueue = NULL;
        }
}

bool IPC::CQPIDPubSubAdaptor::init(const CIPCConfig& config)
{
        if(m_bIsValid)
                return true;

        try
        {
                m_connection.open(config.getHost().c_str(),
atoi(config.getPort().c_str()));
                m_pSession = new
qpid::client::Session(m_connection.newSession());

                m_sSessionName = config.getHost() + ":" + config.getPort();
                //might need to create the session with a name for later
usage.
                DEBUG << "create qpid session with " << m_sSessionName <<
ENDL;

                m_bIsValid = true;

                m_iWaitTime = config.getWaitTime();

                //TODO: there will be multiple susbcribe routing key
                m_sAddressSndr = config.getAddressSender();
                DEBUG << "create qpid session publish routing key " <<
m_sAddressSndr << ENDL;

                m_pHelpQueue = new CFFSPSCQueueAdaptor();
                m_pHelpQueue->init(config);

                //start the listner in another thread.
                IPC::CListener* pListener = new IPC::CListener(*m_pSession,
*m_pHelpQueue);
                pListener->start(config);

                m_pConListner = (void*)pListener;

                return true;
        }
        catch(const std::exception& error)
        {
                ERROR << error.what() << ENDL;
        }

        m_bIsValid = false;

        return false;
}
unsigned int IPC::CQPIDPubSubAdaptor::enqueue(char* pData, unsigned int&
iLen)
{
        if(!isValid()
                        || !pData
                        || iLen <= 0)
                return 0;

        DEBUG << "try to send message to sender address " << m_sAddressSndr
<< " with content " << pData << ENDL;

        try
        {
                //Do we need to create a new session each time a message is
sent?
                qpid::client::Session l_session = m_connection.newSession();

                qpid::client::Message l_message;
               
l_message.getDeliveryProperties().setRoutingKey(m_sAddressSndr);
                l_message.setData(pData);

                //amq.topic is the exchange for Pub/Sub pattern, can be
hard-coded for now
               
l_session.messageTransfer(qpid::client::arg::content=l_message,
qpid::client::arg::destination=IPC::cs_strQPIDExchgAMQTopic);

                return iLen;
        }
        catch(const std::exception& error)
        {
        ERROR << error.what() << ENDL;
    }

        return 0;
}

Thanks a lot in advance! Looking forward to your replies.





--
View this message in context: http://qpid.2158936.n2.nabble.com/Client-warning-Session-was-not-closed-cleanly-tp7602834.html
Sent from the Apache Qpid users mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: users-unsubscribe@qpid.apache.org
For additional commands, e-mail: users-help@qpid.apache.org


Mime
View raw message