qpid-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jason Jones <jason-jo...@verizon.net>
Subject How do you stop a subscription manager with a busy message listener
Date Sat, 09 Jan 2010 00:01:51 GMT
I am having a problem stopping a busy message listener that is busy.  
When I attempt to stop subscription/subscription manager for a message 
listener that is processing messages from a queue with a high rate of 
message traffic I get random coring.

I have created a message listener thats runs in a separate thread 
created by calling SubscriptionManager::start().   When I call 
SubscriptionManager::stop() from  another thread my process cores.  If 
the message listener is idle then my program will not.

I discovered this problem as I was trying to figure out how to manually 
acknowledge a message from the broker.  Below I have included the code 
that I wrote.

To reproduce the problem do the following:

    * Create a direct exchange named "TestExchange"
    * Create a queue named "TestQueue"
    * Bind TestQueue to TestExchange with the binding key "binding_key"
    * Run the sender program. (This will send one message to the
      TestExchange and then exit.
      $ ./sender
    * Run the receiver program, then after it starts receiving message
      press control+c to terminate it.
      $ ./receiver

It might be necessary to run the program a few times to get it to core.
Here is the stack trace:

#0  0x00007f3aa4829fb5 in raise () from /lib/libc.so.6
(gdb) bt
#0  0x00007f3aa4829fb5 in raise () from /lib/libc.so.6
#1  0x00007f3aa482bbc3 in abort () from /lib/libc.so.6
#2  0x00007f3aa4822f09 in __assert_fail () from /lib/libc.so.6
#3  0x00007f3aa577cadd in qpid::sys::Mutex::lock (this=<value optimized 
out>)
    at ./qpid/sys/posix/Mutex.h:116
#4  0x00007f3aa5387e4e in qpid::client::Dispatcher::run (this=0x25c1da0)
    at ./qpid/sys/Mutex.h:44
#5  0x00007f3aa5773cca in runRunnable (p=0x2518)
    at qpid/sys/posix/Thread.cpp:35
#6  0x00007f3aa3b893ba in start_thread () from /lib/libpthread.so.0
#7  0x00007f3aa48dcfcd in clone () from /lib/libc.so.6
#8  0x0000000000000000 in ?? ()

I am running QPID along with my programs on Ubuntu 9.04.

Since I intentionally made the receiver program release all messages 
instead of accepting the message, the one message that I sent will 
remain in the queue indefinitely and the receiver will continually 
receive the message over and over again until you press control-c.

Can you tell me whether the code that I wrote in the method 
MyMessageHandler::stop() is calling the various qpid methods in correct 
sequence?


Thanks,
Jason Jones


receiver.cpp (This is the program that cores)
#include <qpid/client/Connection.h>
#include <qpid/client/Session.h>
#include <qpid/client/Message.h>
#include <qpid/client/MessageListener.h>
#include <qpid/client/Subscription.h>
#include <qpid/client/SubscriptionManager.h>
#include <qpid/client/SubscriptionManager.h>

#include <exception>
#include <iostream>
#include <string>

#include <signal.h>
#include <time.h>

using namespace std;
using namespace qpid::client;

sig_atomic_t shutdown = 0;

void sigIntHandler(int signo)
{
    shutdown = 1;
}
 
class MyMessageHandler : public MessageListener
{
    public:
        MyMessageHandler();
        ~MyMessageHandler();
        void received(Message& message);
        void start(Session& session, const string& queue);
        void stop();
       
     private:
        MyMessageHandler(const MyMessageHandler&);
        MyMessageHandler& operator=(const MyMessageHandler&);
        Subscription subscription_;
        SubscriptionManager* subscriptionManager_;
};

MyMessageHandler::MyMessageHandler()
    : MessageListener(),
      subscription_(),
      subscriptionManager_(0)
{
}

MyMessageHandler::~MyMessageHandler()
{
    stop();
    delete subscriptionManager_;
    subscriptionManager_ = 0;
}

void
MyMessageHandler::received(Message& message)
{
    cout << "Received message" << endl;
    subscription_.acquire(message);
    subscription_.release(message);
}


void
MyMessageHandler::start(Session& session, const string& queue)
{
    SubscriptionSettings subscriptionSettings;

    subscriptionSettings.autoAck = 0;
    subscriptionSettings.acquireMode = ACQUIRE_MODE_PRE_ACQUIRED;
   
    try
    {
        subscriptionManager_ = new SubscriptionManager(session);
        subscriptionManager_->setAutoStop(true);
        subscription_ = subscriptionManager_->subscribe(*this, queue, 
subscriptionSettings, "jason");
        subscriptionManager_->start();
    }
    catch(...)
    {
        delete subscriptionManager_;
        subscriptionManager_ = 0;
        throw;
    }
}


void
MyMessageHandler::stop()
{
    if(0 != subscriptionManager_)
    {
        // I think order matters on these two lines
        subscriptionManager_->stop();
        subscription_.cancel();
        delete subscriptionManager_;
        subscriptionManager_ = 0;
    }
}
 
 
int main(int argc, char* argv[])
{
    Connection connection;
    Session session;
    string host = "localhost";
    int port = 5672;
    string exchange = "TestExchange";
    struct sigaction action = {0};
    struct timespec second = {1, 0};
    MyMessageHandler MyMessageHandler;
   
    action.sa_handler = sigIntHandler;
    action.sa_flags = SA_RESTART;
   
    sigaction(SIGINT, &action, 0);
   
    try
    {
        connection.open(host, port);
        session = connection.newSession();
        cout << "Connected to broker" << endl;
        MyMessageHandler.start(session, string("TestQueue"));
       
        while(0 == shutdown)
        {
            nanosleep(&second, 0);
        }
        cout << "Shutting down" << endl;
        MyMessageHandler.stop();
        session.close();
        connection.close();
    }
    catch(const exception& e)
    {
        cerr << "ERROR: " << e.what() << endl;
    }
   
    return 0;
}




sender.cpp
#include <qpid/client/Connection.h>
#include <qpid/client/Session.h>
#include <qpid/client/Message.h>

#include <exception>
#include <iostream>
#include <string>


using namespace std;
using namespace qpid::client;


int main(int argc, char* argv[])
{
    Connection connection;
    Session session;
    string host = "eagle";
    int port = 5672;
    string exchange = "TestExchange";
    Message message;
   
    try
    {
        connection.open(host, port);
        session = connection.newSession();
       
        message.getDeliveryProperties().setRoutingKey("routing_key");
        message.setData("Hello World");
       
        cout << "Sending message" << endl;
        session.messageTransfer(exchange, 1, 0, message);
        cout << "Message sent" << endl;
       
        session.close();
        connection.close();
    }
    catch(const exception& e)
    {
        cerr << "ERROR: " << e.what() << endl;
    }
   
    return 0;
}



Makefile
CC = g++
INCLUDES = -I.

SENDER_EXEC = sender
SENDER_MAIN = sender.cpp
RECEIVER_EXEC = receiver
RECEIVER_MAIN = receiver.cpp

LIBRARIES = -lqpidcommon \
            -lqpidclient

all: linkSender linkReceiver

linkSender:
    $(CC) -o $(SENDER_EXEC) $(SENDER_MAIN) $(INCLUDES) $(LIBRARIES)

linkReceiver:
    $(CC) -o $(RECEIVER_EXEC) $(RECEIVER_MAIN) $(INCLUDES) $(LIBRARIES)

clean:
    rm -f ../lib/${LIBRARY} ${OBJECTS} ${SENDER_EXEC}



Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message