qpid-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Gang, Litao" <litao.g...@jpmorgan.com>
Subject when a message is considered delivered? How deliver and deliver only once is guaranteed?
Date Tue, 17 Jan 2012 22:28:45 GMT
Hello there,

We are using qpid0.8 c++.   We have a typical client/server setup where server gets requests
from a queue and process it.  We use the qpid callback mechanism for that purpose.  The problem
is that from time to time, a bad request showed up and crashed our server in processing. 
But the request is still considered sitting on the queue.  Therefore another instance of the
server tries to do the same thing and in turn crashes again.  Leaving our servers in a not
useable state.  Is there a way to work around this or rather what mistakes I have made here?
   What server does is kinds of complicated and it uses some legacy and 3party libraries therefore
not easy to prevent it from crashing.   Crashing once is fine as long as the subsequent good
requests get processed normally.  But what we see is that the bad request will be picked up
by the next server and we can not get out of that state.   Ideally after the message layer
delivers the message to the application level, the message itself is considered consumed.
 Whatever stupid and bad things happen in the application codes should not matter much.  
After all what qpid does should be delivery only.  As long as this is done, it is done.  Why
we still see the message sitting there?

So my question is, when a message in a queue is marked as consumed?   Ideally it should be
right before the received(.) callback is invoked.  But from what I see looks like it is not.

As a workaround, can I call some routine to tell the broker to remove the message on queue?

Thank you very much for your help, really appreciate it!



In Qpid::client::Dispatcher

void Dispatcher::run()
{
     Mutex::ScopedLock l(lock);
     if (running)
         throw Exception("Dispatcher is already running.");
     state_saver<bool>  reset(running); // Reset to false on exit.
     running = true;
     try {
        while (!queue->isClosed()) {
             Mutex::ScopedUnlock u(lock);
             FrameSet::shared_ptr content = queue->pop();
             if (content->isA<MessageTransferBody>()) {
                 Message msg(new MessageImpl(*content));
                 boost::intrusive_ptr<SubscriptionImpl> listener = find(msg.getDestination());
                 if (!listener) {
                     QPID_LOG(error, "No listener found for destination " << msg.getDestination());
                 } else {
                     assert(listener);
                     listener->received(msg);
                 }
             } else {
                 if (handler.get()) {
                     handler->handle(*content);
                 } else {
                     QPID_LOG(warning, "No handler found for " << *(content->getMethod()));
                 }
             }
         }
         session.sync(); // Make sure all our acks are received before returning.
     }
     catch (const ClosedException&) {
         QPID_LOG(debug, QPID_MSG(session.getId() << ": closed by peer"));
     }
     catch (const TransportFailure&) {
         QPID_LOG(info, QPID_MSG(session.getId() << ": transport failure"));
         throw;
     }
     catch (const std::exception& e) {
         if ( failoverHandler ) {
             QPID_LOG(debug, QPID_MSG(session.getId() << " failover: " << e.what()));
             failoverHandler();
         } else {
             QPID_LOG(error, session.getId() << " error: " << e.what());
             throw;
         }
     }

Inside qpid::client::SubscriptionImpl::received() :

void SubscriptionImpl::received(Message& m) {
     Mutex::ScopedLock l(lock);
     MessageImpl& mi = *MessageImpl::get(m);
     if (mi.getMethod().getAcquireMode() == ACQUIRE_MODE_NOT_ACQUIRED)
         unacquired.add(m.getId());
     else if (mi.getMethod().getAcceptMode() == ACCEPT_MODE_EXPLICIT)
         unaccepted.add(m.getId());
     if (listener) {
         Mutex::ScopedUnlock u(lock);
         listener->received(m);  <--if crashes here,what will happen?  the message considered
consumed and removed from queue yet, no?
     }
     if (settings.completionMode == COMPLETE_ON_DELIVERY) {
         manager.getSession().markCompleted(m.getId(), false, false);
     }
     if (settings.autoAck) {
         if (unaccepted.size() >= settings.autoAck) {
             async(manager.getSession()).messageAccept(unaccepted);
             switch (settings.completionMode) {
               case COMPLETE_ON_ACCEPT:
                 manager.getSession().markCompleted(unaccepted, true);
                 break;
               case COMPLETE_ON_DELIVERY:
                 manager.getSession().sendCompletion();
                 break;
               default://do nothing
                 break;
             }
             unaccepted.clear();
         }
     }
}




This email is confidential and subject to important disclaimers and
conditions including on offers for the purchase or sale of
securities, accuracy and completeness of information, viruses,
confidentiality, legal privilege, and legal entity disclaimers,
available at http://www.jpmorgan.com/pages/disclosures/email.  
Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message