activemq-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Matevz Tadel <matevz.ta...@cern.ch>
Subject [amq-cpp] cms::ExceptionListener::onException() and what one can do there
Date Fri, 08 Mar 2013 02:22:02 GMT
Hi,

I'm using a single connection to an AMQ topic in a multi-threaded program that 
does many other things beyond sending data over AMQ. So, when an AMQ 
onException() happens I really do not want to exit the application (all demos 
seem to have "// exit(1);" in this handler :) ) and would like to reinitialize 
my single connection even if this means leaking some resources.

I'm getting exceptions like this since switching to 3.5.0 (but the load on 
application is getting higher so it doesn't have to be a change introduced in 
3.5.0):
2013-03-06 01:51:51 ERR XrdFileCloseReporterAmq::onException Exception callback 
invoked:
     DataInputStream::readLong - Reached EOF
and the application now crashes when I close the connection (before this used to 
trigger throwing of a true exception in my application thread that does AMQ calls).

Please see my connect and onException() implementations below. Full class is 
available here:
http://www.gled.org/viewvc/gled/trunk/libsets/XrdMon/Glasses/XrdFileCloseReporterAmq.h?revision=2860&view=markup
http://www.gled.org/viewvc/gled/trunk/libsets/XrdMon/Glasses/XrdFileCloseReporterAmq.cxx?revision=2862&view=markup

I guess the question is ... is there anything sane one can do after 
onException() is invoked or this actually means internal state of amq-cpp is 
corrupted beyond repair? If this is indeed the case, the only option  for me is 
to separate sending of AMQ messages from the main application.

Best,
Matevz


void XrdFileCloseReporterAmq::amq_connect()
{
   static const Exc_t _eh("XrdFileCloseReporterAmq::amq_connect ");

   TString uri;

   // 2012-07 Failover and Stomp don't splice ... or, they splice too well, 
making as
   // many threads as ulimit lets them on first error ... thrashing the machine.
   // uri.Form("failover://(tcp://%s:%hu?wireFormat=stomp)", mAmqHost.Data(), 
mAmqPort);

   uri.Form("tcp://%s:%hu?wireFormat=stomp", mAmqHost.Data(), mAmqPort);

   try
   {
     auto_ptr<cms::ConnectionFactory> conn_fac
       (new activemq::core::ActiveMQConnectionFactory(uri.Data(), 
mAmqUser.Data(), mAmqPswd.Data()));

     mConn = conn_fac->createConnection();
     mConn->setExceptionListener(this);
     mConn->start();
   }
   catch (cms::CMSException& e)
   {
     throw _eh + "Exception during connection creation: " + e.getStackTraceString();
   }

   try
   {
     mSess = mConn->createSession(); // Default is AUTO_ACKNOWLEDGE
     mDest = mSess->createTopic(mAmqTopic.Data());
     mProd = mSess->createProducer(mDest);
     mProd->setDeliveryMode(cms::DeliveryMode::NON_PERSISTENT); // Copied from 
examples, NFI.
   }
   catch (cms::InvalidDestinationException& e)
   {
     throw _eh + "Invalid destination exception during producer creation: " + 
e.getStackTraceString();
   }
   catch (cms::CMSException& e)
   {
     throw _eh + "Exception during session, topic or message producer 
initialization: " + e.getStackTraceString();
   }
}


void XrdFileCloseReporterAmq::onException(const cms::CMSException &e)
{
   static const Exc_t _eh("XrdFileCloseReporterAmq::onException ");

   if (*mLog)
       mLog->Form(ZLog::L_Error, _eh, "Exception callback invoked:\n    %s",
		 e.getStackTraceString().c_str());

   cms::Connection *conn = mConn;
   mConn = 0;

   // This somehow results in proper exception getting thrown in the amq thread.
   // Go figure ... zen engineering or just a bug?
   //
   // 2013-03-07: Argh ... and this apparently crashes with activemq-cpp-3.5.0!
   // What to do?
   // Use mConn == 0 in main thread as a signal things should be reopened.
   // But do no call close :)
   if (conn)
   {
     conn->close();
     delete conn;
   }
}



Mime
View raw message